something like that

pull/201/head^2
connor rigby 2016-11-20 20:34:10 -08:00
parent c76d20c7d3
commit 1fa3e70594
6 changed files with 122 additions and 56 deletions

View File

@ -2,13 +2,13 @@ use Mix.Config
import_config "#{Mix.env}.exs"
config :farmbot_auth,
callbacks: [Farmbot.Sync, Farmbot.RPC.Transport.Mqtt]
callbacks: [Farmbot.Sync, Farmbot.RPC.Transport.GenMqtt.Handler]
config :farmbot_configurator,
callback: Farmbot.BotState.Monitor
config :json_rpc,
transport: Farmbot.RPC.Transport.Mqtt,
transport: Farmbot.RPC.Transport.GenMqtt.Handler,
handler: Farmbot.RPC.Handler
config :uart,

View File

@ -0,0 +1,48 @@
defmodule Farmbot.RPC.Transport.GenMqtt.Client do
@moduledoc """
Experimental mqtt transport.
"""
use GenMQTT
require Logger
def init(%Token{} = token) do
{:ok, token}
end
def start_link(%Token{} = token) do
GenMQTT.start_link(__MODULE__, token, build_opts(token))
end
def on_connect(%Token{} = token) do
Logger.warn("MQTT CONNECTED")
GenMQTT.subscribe(self(), bot_topic(token), 0)
Farmbot.Sync.sync
{:ok, token}
end
def handle_cast({:emit, binary}, %Token{} = token) do
Logger.debug("hey")
GenMQTT.publish(self(), frontend_topic(token), binary, 1)
{:noreply, token}
end
# this is not a erronous situation, so don't alert.
def terminate(:new_token, _) do
Logger.warn("Get a new token. MQTT Going down.")
:ok
end
defp build_opts(%Token{} = token) do
[name: __MODULE__,
host: token.unencoded.mqtt,
password: token.encoded,
username: token.unencoded. bot]
end
defp frontend_topic(%Token{} = token) do
"bot/#{token.unencoded.bot}/from_device"
end
defp bot_topic(%Token{} = token) do
"bot/#{token.unencoded.bot}/from_clients"
end
end

View File

@ -0,0 +1,67 @@
alias Farmbot.RPC.Transport.GenMqtt.Client, as: Client
defmodule Farmbot.RPC.Transport.GenMqtt.Handler do
#GenServer.cast Farmbot.BotState.Authorization, :try_log_in
@moduledoc """
Experimental mqtt watcher.
"""
use GenServer
require Logger
def init(_args) do
Process.flag(:trap_exit, true)
case Farmbot.Auth.get_token |> Token.create do
%Token{} = token ->
{:ok, {token, start_client(token)}}
_ ->
{:ok, {nil, nil}}
end
end
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@spec start_client(Token.t) :: pid
defp start_client(%Token{} = token) do
{:ok, pid} = Client.start_link(token)
pid
end
@spec stop_client(pid) :: :ok
defp stop_client(pid) do
GenServer.stop(pid, :new_token)
end
def handle_cast({:emit, binary}, {%Token{} = token, pid})
when is_pid(pid) do
GenServer.cast(pid, {:emit, binary})
{:noreply, {token, pid}}
end
def handle_cast({:emit, binary}, state) do
{:noreply, state}
end
# We got a token and are not connected to mqtt yet.
def handle_info({:authorization, maybe_token}, {_, nil}) do
token = Token.create(maybe_token)
{:noreply, {token, start_client(token)}}
end
def handle_info({:authorization, maybe_token}, {_, pid})
when is_pid(pid) do
token = Token.create(maybe_token)
stop_client(pid)
{:noreply, {token, start_client(token)}}
end
def handle_info(info, pid_or_nil) do
Logger.warn("#{inspect info}")
{:noreply, pid_or_nil}
end
@spec emit(binary) :: :ok
def emit(binary) do
GenServer.cast(__MODULE__, {:emit, binary})
end
end

View File

@ -1,51 +0,0 @@
defmodule Mqtt.Client do
use Hulaaki.Client
def on_connect(message: message, state: state) do
GenServer.call(state.parent, {:connect, message})
end
def on_connect_ack(message: message, state: state) do
GenServer.call(state.parent, {:connect_ack, message})
end
def on_publish(message: message, state: state) do
GenServer.call(state.parent, {:publish, message})
end
def on_subscribed_publish(message: message, state: state) do
GenServer.call(state.parent, {:subscribed_publish, message})
end
def on_subscribed_publish_ack(message: message, state: state) do
GenServer.call(state.parent, {:subscribed_publish_ack, message})
end
def on_publish_receive(message: message, state: state) do
GenServer.call(state.parent, {:publish_receive, message})
end
def on_publish_release(message: message, state: state) do
GenServer.call(state.parent, {:publish_release, message})
end
def on_publish_complete(message: message, state: state) do
GenServer.call(state.parent, {:publish_complete, message})
end
def on_publish_ack(message: message, state: state) do
GenServer.call(state.parent, {:publish_ack, message})
end
def on_subscribe(message: message, state: state) do
GenServer.call(state.parent, {:subscribe, message})
end
def on_subscribe_ack(message: message, state: state) do
GenServer.call(state.parent, {:subscribe_ack, message})
end
def on_unsubscribe(message: message, state: state) do
GenServer.call(state.parent, {:unsubscribe, message})
end
def on_unsubscribe_ack(message: message, state: state) do
GenServer.call(state.parent, {:unsubscribe_ack, message})
end
def on_ping(message: message, state: state) do
GenServer.call(state.parent, {:ping, message})
end
def on_pong(message: message, state: state) do
GenServer.call(state.parent, {:pong, message})
end
def on_disconnect(message: message, state: state) do
GenServer.call(state.parent, {:disconnect, message})
end
end

View File

@ -47,7 +47,6 @@ defmodule Farmbot.Mixfile do
:gen_stage,
:nerves_lib,
:rsa,
:hulaaki,
:runtime_tools,
:mustache,
:timex,
@ -85,7 +84,8 @@ defmodule Farmbot.Mixfile do
{:poison, "~> 2.0"},
{:gen_stage, "~> 0.4"},
{:nerves_lib, github: "nerves-project/nerves_lib"},
{:hulaaki, github: "ConnorRigby/hulaaki"},
# {:hulaaki, github: "ConnorRigby/hulaaki"},
{:gen_mqtt, "~> 0.3.1"},
{:mustache, "~> 0.0.2"},
{:timex, "~> 3.0"},
# {:farmbot_auth, github: "Farmbot/farmbot_auth"},

View File

@ -16,6 +16,7 @@
"fake_nerves": {:git, "https://github.com/ConnorRigby/fake_nerves.git", "0ffd1df5f4a823257c1c5966d74cd1b26263b2a9", []},
"farmbot_auth": {:git, "https://github.com/Farmbot/farmbot_auth.git", "5330f231058bb72aefb83522f4f2c5b7d00c4ee7", []},
"farmbot_configurator": {:git, "https://github.com/Farmbot/farmbot_configurator.git", "3b41b4b12fa88a9717656e025113daf87de49d14", []},
"gen_mqtt": {:hex, :gen_mqtt, "0.3.1", "6ce6af7c2bcb125d5b4125c67c5ab1f29bcec2638236509bcc6abf510a6661ed", [:mix], [{:vmq_commons, "1.0.0", [hex: :vmq_commons, optional: false]}]},
"gen_stage": {:hex, :gen_stage, "0.8.0", "a76e3f0530f86fae8b8a1021c06527b1ec171cf4c0bdfecd8d5ad0376d1205af", [:mix], []},
"getopt": {:hex, :getopt, "0.8.2", "b17556db683000ba50370b16c0619df1337e7af7ecbf7d64fbf8d1d6bce3109b", [:rebar], []},
"gettext": {:hex, :gettext, "0.12.1", "c0624f52763469ef7a3674919ae28b8286d88195b90fa1516180f31bbbd26d14", [:mix], []},
@ -53,4 +54,5 @@
"rsa": {:hex, :rsa, "0.0.1", "a63069f88ce342ffdf8448b7cdef4b39ba7dee3c1510644a39385c7e63ba246f", [:mix], []},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []},
"timex": {:hex, :timex, "3.1.3", "3f9e7cd03190faf143098857bebd26c6d33e0709e7d3c42095da2393765fa225", [:mix], [{:combine, "~> 0.7", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]},
"tzdata": {:hex, :tzdata, "0.5.9", "575be217b039057a47e133b72838cbe104fb5329b19906ea4e66857001c37edb", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}}
"tzdata": {:hex, :tzdata, "0.5.9", "575be217b039057a47e133b72838cbe104fb5329b19906ea4e66857001c37edb", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]},
"vmq_commons": {:hex, :vmq_commons, "1.0.0", "5f5005c12db33f92f40e818a3617fb148972d59adcf99298c9d3808ef3582e34", [:rebar3], []}}