going to merge back into master
parent
3e830c4c71
commit
8305aa040f
|
@ -15,7 +15,6 @@ defmodule Command.Tracker do
|
|||
|
||||
# When we get a successful message, reset the counter.
|
||||
def handle_cast(:done, _) do
|
||||
# Farmbot.RPC.Handler.send_status
|
||||
{:noreply, 0}
|
||||
end
|
||||
|
||||
|
@ -23,10 +22,18 @@ defmodule Command.Tracker do
|
|||
{:noreply, 0}
|
||||
end
|
||||
|
||||
# BUG: I dont know what caused this but for some reason this message hit
|
||||
# this module. it then logged straight to the frontend.
|
||||
# Containint the token in plain text. This is just to make sure that doesn't
|
||||
# ever happen again. (thanks pattern matching)
|
||||
def handle_cast({:authorization, _}, count) do
|
||||
{:noreply, count}
|
||||
end
|
||||
|
||||
# If the message is not successful, and count is less than three,
|
||||
# increase it.
|
||||
def handle_cast(issue, count) when count < 2 do
|
||||
# Farmbot.RPC.Handler.send_status
|
||||
IO.inspect issue
|
||||
Farmbot.Logger.log("Problem writing command! #{inspect issue}", [:error_toast], [@tag])
|
||||
{:noreply, count + 1}
|
||||
end
|
||||
|
|
|
@ -15,7 +15,6 @@ defmodule Farmbot.RPC.Transport.GenMqtt.Client do
|
|||
def on_connect(%Token{} = token) do
|
||||
Logger.debug "MQTT Connected"
|
||||
GenMQTT.subscribe(self(), bot_topic(token), 0)
|
||||
Farmbot.Sync.sync
|
||||
Logger.debug "MQTT Subscribed"
|
||||
{:ok, token}
|
||||
end
|
||||
|
@ -26,12 +25,11 @@ defmodule Farmbot.RPC.Transport.GenMqtt.Client do
|
|||
msg
|
||||
|> Poison.decode!
|
||||
|> RPC.MessageManager.sync_notify
|
||||
Logger.debug("Got message: #{inspect msg}")
|
||||
{:ok, token}
|
||||
end
|
||||
|
||||
def handle_cast(something, state) do
|
||||
Logger.debug("CAST: #{inspect something}")
|
||||
Logger.warn("CAST: #{inspect something}")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
|
@ -51,15 +49,17 @@ defmodule Farmbot.RPC.Transport.GenMqtt.Client do
|
|||
:ok
|
||||
end
|
||||
|
||||
def terminate(_, _) do
|
||||
Logger.warn("Mqtt Client died!")
|
||||
:ok
|
||||
end
|
||||
|
||||
@spec build_opts(Token.t) :: GenMQTT.option
|
||||
defp build_opts(%Token{} = token) do
|
||||
[name: __MODULE__,
|
||||
host: token.unencoded.mqtt,
|
||||
password: token.encoded,
|
||||
username: token.unencoded.bot]
|
||||
# these dont work for some reason
|
||||
# last_will_topic: frontend_topic(token),
|
||||
# last_will_message: build_last_will_message]
|
||||
end
|
||||
|
||||
@spec frontend_topic(Token.t) :: String.t
|
||||
|
@ -68,12 +68,13 @@ defmodule Farmbot.RPC.Transport.GenMqtt.Client do
|
|||
@spec bot_topic(Token.t) :: String.t
|
||||
defp bot_topic(%Token{} = token), do: "bot/#{token.unencoded.bot}/from_clients"
|
||||
|
||||
|
||||
defp build_last_will_message do
|
||||
msg = Farmbot.RPC.Handler.log_msg("Bot going offline",
|
||||
[:error_ticker],
|
||||
["ERROR"])
|
||||
IO.inspect msg
|
||||
msg
|
||||
end
|
||||
# NOT WORKING
|
||||
# @spec build_last_will_message :: String.t
|
||||
# defp build_last_will_message do
|
||||
# msg = Farmbot.RPC.Handler.log_msg("Bot going offline",
|
||||
# [:error_ticker],
|
||||
# ["ERROR"])
|
||||
# IO.inspect msg
|
||||
# msg
|
||||
# end
|
||||
end
|
||||
|
|
|
@ -34,12 +34,11 @@ defmodule Farmbot.RPC.Transport.GenMqtt.Handler do
|
|||
|
||||
def handle_cast({:emit, binary}, {%Token{} = token, pid})
|
||||
when is_pid(pid) do
|
||||
Logger.debug("REALLY EMITTING")
|
||||
send(Client, {:emit, binary})
|
||||
{:noreply, {token, pid}}
|
||||
end
|
||||
|
||||
def handle_cast({:emit, binary}, state) do
|
||||
def handle_cast({:emit, _binary}, state) do
|
||||
# Save messages when offline maybe?
|
||||
{:noreply, state}
|
||||
end
|
||||
|
@ -57,6 +56,12 @@ defmodule Farmbot.RPC.Transport.GenMqtt.Handler do
|
|||
{:noreply, {token, start_client(token)}}
|
||||
end
|
||||
|
||||
def handle_info({:EXIT, pid, _reason}, {%Token{} = token, client})
|
||||
when client == pid do
|
||||
# restart the client if it dies.
|
||||
{:noreply, {token, start_client(token)}}
|
||||
end
|
||||
|
||||
def handle_info(info, pid_or_nil) do
|
||||
Logger.warn("#{inspect info}")
|
||||
{:noreply, pid_or_nil}
|
||||
|
|
|
@ -1,221 +0,0 @@
|
|||
defmodule Farmbot.RPC.Transport.Mqtt do
|
||||
@moduledoc """
|
||||
This is the transport for mqtt. There is an interesting
|
||||
race condition. if someone can find it i will give them
|
||||
ten dollars. I think it is in the Haluki source code somewhere.
|
||||
"""
|
||||
require GenServer
|
||||
require Logger
|
||||
|
||||
def init(_args) do
|
||||
Logger.debug("MQTT INIT")
|
||||
Process.flag(:trap_exit, true)
|
||||
{:ok, client} = Mqtt.Client.start_link(%{parent: self()})
|
||||
Process.sleep(2000) # maybe fix race condition?
|
||||
case Farmbot.Auth.get_token |> Token.create do
|
||||
%Token{} = token ->
|
||||
login(token)
|
||||
{:ok, {client, token}}
|
||||
_ ->
|
||||
{:ok, {client, nil}}
|
||||
end
|
||||
end
|
||||
|
||||
def start_link(args) do
|
||||
GenServer.start_link(__MODULE__, args, name: __MODULE__)
|
||||
end
|
||||
|
||||
# dont allow emits when we dont have a token
|
||||
def handle_cast(_, {client, nil}) do
|
||||
{:noreply, {client, nil}}
|
||||
end
|
||||
|
||||
def handle_cast({:emit, message}, {client, token}) when is_bitstring(message) do
|
||||
options = [ id: 1234,
|
||||
topic: "bot/#{bot(token)}/from_device",
|
||||
message: message,
|
||||
dup: 1, qos: 1, retain: 0 ]
|
||||
spawn fn -> Mqtt.Client.publish(client, options) end
|
||||
{:noreply, {client, token}}
|
||||
end
|
||||
|
||||
def handle_info(:login, {client, nil})
|
||||
when is_pid(client) do
|
||||
Logger.warn("This shouldn't be possible.")
|
||||
end
|
||||
|
||||
def handle_info(:login, {client, token})
|
||||
when is_pid(client) do
|
||||
Logger.debug("Trying to sign into MQTT")
|
||||
resp = Mqtt.Client.connect(client, connect_options(token))
|
||||
Logger.debug("FINDME: #{inspect resp}")
|
||||
{:noreply, {client, token}}
|
||||
end
|
||||
|
||||
def handle_info({:EXIT, pid, reason}, {client, token})
|
||||
when pid == client do
|
||||
Logger.error("Hulaki died: #{inspect reason}")
|
||||
{:crashme, {client, token}}
|
||||
end
|
||||
|
||||
def handle_info({:EXIT, pid, reason}, {client, token}) do
|
||||
Logger.warn("something in mqtt.ex died: #{inspect pid}: #{inspect reason}")
|
||||
{:noreply, {client, token}}
|
||||
end
|
||||
|
||||
def handle_info({:authorization, token}, {client, _old_token}) do
|
||||
r_token = Token.create(token)
|
||||
r_token |> login
|
||||
{:noreply, {client, r_token}}
|
||||
end
|
||||
|
||||
# We still want to keep alive, but dont actually preform the ping
|
||||
def handle_info({:keep_alive}, {client, nil}) do
|
||||
keep_connection_alive
|
||||
{:noreply, {client, nil}}
|
||||
end
|
||||
|
||||
def handle_info({:keep_alive}, {client, token}) do
|
||||
Mqtt.Client.ping(client)
|
||||
keep_connection_alive
|
||||
{:noreply, {client, token}}
|
||||
end
|
||||
|
||||
def emit(message) when is_bitstring(message) do
|
||||
GenServer.cast(__MODULE__, {:emit, message})
|
||||
end
|
||||
|
||||
defp bot(%Token{} = token) do
|
||||
token.unencoded.bot
|
||||
end
|
||||
|
||||
defp keep_connection_alive do
|
||||
Process.send_after(__MODULE__, {:keep_alive}, 15000)
|
||||
end
|
||||
|
||||
def terminate(reason, _state) do
|
||||
Logger.debug("#{__MODULE__} died. #{inspect reason}")
|
||||
end
|
||||
|
||||
defp build_last_will_message do
|
||||
Farmbot.RPC.Handler.log_msg("Bot going offline", [:error_ticker], ["ERROR"])
|
||||
end
|
||||
|
||||
defp login(token) when is_map(token) do
|
||||
# This wait fixes haluki crashing for some reason.
|
||||
Process.send_after(__MODULE__, :login, 2000)
|
||||
end
|
||||
|
||||
defp connect_options(%Token{} = token) do
|
||||
# mqtt_host = Map.get(token, "unencoded") |> Map.get("mqtt")
|
||||
mqtt_host = token.unencoded.mqtt
|
||||
# mqtt_user = Map.get(token, "unencoded") |> Map.get("bot")
|
||||
mqtt_user = token.unencoded.bot
|
||||
# mqtt_pass = Map.get(token, "encoded")
|
||||
mqtt_pass = token.encoded
|
||||
[client_id: mqtt_user,
|
||||
username: mqtt_user,
|
||||
password: mqtt_pass,
|
||||
host: mqtt_host, # mqtt.farmbot.io
|
||||
port: 1883,
|
||||
# port: 8883,
|
||||
timeout: 5000,
|
||||
keep_alive: 500,
|
||||
will_topic: "bot/#{bot(token)}/from_device",
|
||||
will_message: build_last_will_message,
|
||||
will_qos: 0,
|
||||
will_retain: 0]
|
||||
end
|
||||
|
||||
## ##
|
||||
# Callbacks from the Mqtt_Client #
|
||||
## ##
|
||||
|
||||
def handle_call(_, _, {client, nil}) do
|
||||
Logger.debug("MQTT Doesn't have a token yet.")
|
||||
{:reply, :no_token, {client, nil}}
|
||||
end
|
||||
|
||||
def handle_call({:connect_ack, _message}, _from, {client, token}) do
|
||||
options = [id: 24756, topics: ["bot/#{bot(token)}/from_clients"], qoses: [0]]
|
||||
spawn fn ->
|
||||
Logger.debug("Subscribing")
|
||||
Mqtt.Client.subscribe(client, options)
|
||||
Farmbot.Sync.sync
|
||||
end
|
||||
keep_connection_alive
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:subscribed_publish, message}, _from, {client, token}) do
|
||||
Map.get(message, :message) |> Poison.decode! |>
|
||||
RPC.MessageManager.sync_notify
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:subscribe_ack, _message}, _from, {client, token}) do
|
||||
Logger.debug("Subscribed.")
|
||||
spawn fn ->
|
||||
Farmbot.Logger.log("Bot is online and ready to roll", [:ticker], ["BOT STATUS"])
|
||||
end
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
### ###
|
||||
# THESE ARE ALL UNUSED CALLBACKS FROM THE MQTT_CLIENT #
|
||||
### ###
|
||||
def handle_call({:unsubscribe, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:unsubscribe_ack, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:ping, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:disconnect, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:pong, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:connect, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
def handle_call({:publish, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
def handle_call({:subscribed_publish_ack, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:publish_receive, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:publish_release, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:publish_complete, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:publish_ack, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call({:subscribe, _message}, _from, {client, token}) do
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
|
||||
def handle_call(thing, _from, {client, token}) do
|
||||
Logger.debug("Unhandled Thing #{inspect thing}")
|
||||
{:reply, :ok, {client, token}}
|
||||
end
|
||||
end
|
7
mix.exs
7
mix.exs
|
@ -51,7 +51,8 @@ defmodule Farmbot.Mixfile do
|
|||
:mustache,
|
||||
:timex,
|
||||
:farmbot_auth,
|
||||
:farmbot_configurator]
|
||||
:farmbot_configurator,
|
||||
:vmq_commons]
|
||||
end
|
||||
|
||||
# on device
|
||||
|
@ -84,14 +85,14 @@ defmodule Farmbot.Mixfile do
|
|||
{:poison, "~> 2.0"},
|
||||
{:gen_stage, "~> 0.4"},
|
||||
{:nerves_lib, github: "nerves-project/nerves_lib"},
|
||||
# {:hulaaki, github: "ConnorRigby/hulaaki"},
|
||||
{:gen_mqtt, "~> 0.3.1"},
|
||||
{:vmq_commons, "1.0.0", manager: :rebar3},
|
||||
{:mustache, "~> 0.0.2"},
|
||||
{:timex, "~> 3.0"},
|
||||
# {:farmbot_auth, github: "Farmbot/farmbot_auth"},
|
||||
{:farmbot_auth, path: "../farmbot_auth"},
|
||||
# {:farmbot_configurator, github: "Farmbot/farmbot_configurator"}
|
||||
{:farmbot_configurator, path: "../farmbot_configurator"}
|
||||
{:farmbot_configurator, path: "../farmbot_configurator"}
|
||||
]
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue