New AMQP transport to replace MQTT.

This commit is contained in:
connor rigby 2017-11-13 10:20:51 -08:00
parent 3387f41aeb
commit 0ae5cf927f
5 changed files with 174 additions and 4 deletions

View file

@ -20,7 +20,8 @@ config :farmbot, :init, [
# Transports.
config :farmbot, :transport, [
Farmbot.BotState.Transport.GenMQTT,
# Farmbot.BotState.Transport.GenMQTT,
Farmbot.BotState.Transport.AMQP,
Farmbot.BotState.Transport.HTTP,
]

View file

@ -193,7 +193,8 @@ defmodule Farmbot.BotState do
@doc "Emit an AST."
def emit(%AST{} = ast) do
GenStage.call(__MODULE__, {:emit, ast})
kind = Module.split(ast.kind) |> List.last |> Macro.underscore()
GenStage.call(__MODULE__, {:emit, %{ast | kind: kind}})
end
@doc "Get user env."

View file

@ -0,0 +1,164 @@
defmodule Farmbot.BotState.Transport.AMQP do
@moduledoc "AMQP Bot State Transport."
use GenStage
use AMQP
use Farmbot.Logger
alias Farmbot.System.ConfigStorage
alias Farmbot.CeleryScript
alias CeleryScript.AST
@exchange "amq.topic"
def start_link do
GenStage.start_link(__MODULE__, [], [name: __MODULE__])
end
# GenStage callbacks
defmodule State do
@moduledoc false
defstruct [:conn, :chan, :queue_name, :bot, :state_cache]
end
def init([]) do
token = ConfigStorage.get_config_value(:string, "authorization", "token")
with {:ok, %{bot: device, mqtt: mqtt_server}} <- Farmbot.Jwt.decode(token),
{:ok, conn} <- AMQP.Connection.open([host: mqtt_server]),
# {:ok, conn} <- AMQP.Connection.open([host: mqtt_server, username: device, password: token]),
{:ok, chan} <- AMQP.Channel.open(conn),
queue_name <- Enum.join([device, UUID.uuid1()], "-"),
{:ok, _} <- AMQP.Queue.declare(chan, queue_name, [auto_delete: true]),
:ok <- AMQP.Queue.bind(chan, queue_name, @exchange, [routing_key: "bot.#{device}.from_clients"]),
:ok <- AMQP.Queue.bind(chan, queue_name, @exchange, [routing_key: "bot.#{device}.sync.#"]),
:ok <- Basic.qos(chan, prefetch_count: 10),
{:ok, _tag} <- Basic.consume(chan, queue_name),
state <- struct(State, [conn: conn, chan: chan, queue_name: queue_name, bot: device])
do
Logger.success(3, "Connected to real time services.")
{:consumer, state, subscribe_to: [Farmbot.BotState, Farmbot.Logger]}
else
{:error, {:auth_failure, msg}} = fail ->
Farmbot.System.factory_reset(msg)
{:stop, fail, :no_state}
{:error, reason} ->
Logger.error 1, "Got error authenticating with Real time services: #{inspect reason}"
:ignore
end
end
def handle_events(events, {pid, _}, state) do
case Process.info(pid)[:registered_name] do
Farmbot.Logger -> handle_log_events(events, state)
Farmbot.BotState -> handle_bot_state_events(events, state)
end
end
def handle_log_events(logs, state) do
for %Farmbot.Log{} = log <- logs do
if log.module == nil or Module.split(log.module || Elixir.Logger) |> List.first == "Farmbot" do
location_data = Map.get(state.state_cache || %{}, :location_data, %{position: %{x: -1, y: -1, z: -1}})
meta = %{type: log.level, x: nil, y: nil, z: nil}
log_without_pos = %{created_at: log.time, meta: meta, channels: log.meta[:channels] || [], message: log.message}
log = add_position_to_log(log_without_pos, location_data)
push_bot_log(state.chan, state.bot, log)
end
end
{:noreply, [], state}
end
def handle_bot_state_events([event | rest], state) do
case event do
{:emit, %AST{} = ast} ->
emit_cs(state.chan, state.bot, ast)
handle_bot_state_events(rest, state)
new_bot_state ->
push_bot_state(state.chan, state.bot, new_bot_state)
handle_bot_state_events(rest, %{state | state_cache: new_bot_state})
end
end
def handle_bot_state_events([], state) do
{:noreply, [], state}
end
# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, _}, state) do
{:noreply, [], state}
end
# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, _}, state) do
{:stop, :normal, state}
end
# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, _}, state) do
{:noreply, [], state}
end
def handle_info({:basic_deliver, payload, %{routing_key: key}}, state) do
IO.puts "#{key}"
device = state.bot
route = String.split(key, ".")
case route do
["bot", ^device, "from_clients"] ->
handle_celery_script(payload, state)
{:noreply, [], state}
["bot", ^device, "sync", resource, _]
when resource in ["Log", "User", "Image", "WebcamFeed"] ->
{:noreply, [], state}
["bot", ^device, "sync", resource, id] ->
handle_sync_cmd(resource, id, payload, state)
["bot", ^device, "logs" ] -> {:noreply, [], state}
["bot", ^device, "status"] -> {:noreply, [], state}
["bot", ^device, "from_device"] -> {:noreply, [], state}
_ ->
Logger.warn 3, "got unknown routing key: #{key}"
{:noreply, [], state}
end
end
defp handle_celery_script(payload, _state) do
case AST.decode(payload) do
{:ok, ast} -> spawn CeleryScript, :execute, [ast]
_ -> :ok
end
end
defp handle_sync_cmd(kind, id, payload, state) do
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
body = struct(mod)
sync_cmd = Poison.decode!(payload, as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
Farmbot.Repo.register_sync_cmd(sync_cmd)
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
Farmbot.Repo.flip()
end
else
Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(payload)}"
end
{:noreply, [], state}
end
defp push_bot_log(chan, bot, log) do
json = Poison.encode!(log)
:ok = AMQP.Basic.publish chan, @exchange, "bot.#{bot}.logs", json
end
defp emit_cs(chan, bot, cs) do
json = Poison.encode!(cs)
:ok = AMQP.Basic.publish chan, @exchange, "bot.#{bot}.from_device", json
end
defp push_bot_state(chan, bot, state) do
json = Poison.encode!(state)
:ok = AMQP.Basic.publish chan, @exchange, "bot.#{bot}.status", json
end
defp add_position_to_log(%{meta: meta} = log, %{position: pos}) do
new_meta = Map.merge(meta, pos)
%{log | meta: new_meta}
end
end

View file

@ -96,7 +96,8 @@ defmodule Farmbot.Mixfile do
{:sqlite_ecto2, "~> 2.2.1"},
{:wobserver, "~> 0.1.8"},
{:joken, "~> 1.1"},
{:socket, "~> 0.3"}
{:socket, "~> 0.3"},
{:amqp, "~> 1.0.0-pre.2"}
]
end

View file

@ -1,4 +1,6 @@
%{"base64url": {:hex, :base64url, "0.0.1", "36a90125f5948e3afd7be97662a1504b934dd5dac78451ca6e9abf85a10286be", [], [], "hexpm"},
%{"amqp": {:hex, :amqp, "1.0.0-pre.2", "952180aac9b3a1faea96522f589327e8286457b8bd0fd84bf0560e06d02f5a5a", [], [{:amqp_client, "~> 3.6.8", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.6.8", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"},
"amqp_client": {:hex, :amqp_client, "3.6.12", "dfdfe7be661feb96ece404092a47431a73797ad412959732d940f96f80290da0", [], [{:rabbit_common, "3.6.12", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"},
"base64url": {:hex, :base64url, "0.0.1", "36a90125f5948e3afd7be97662a1504b934dd5dac78451ca6e9abf85a10286be", [], [], "hexpm"},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [], [], "hexpm"},
"certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [], [], "hexpm"},
"combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [], [], "hexpm"},
@ -45,6 +47,7 @@
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.13.3", "c277cfb2a9c5034d445a722494c13359e361d344ef6f25d604c2353185682bfc", [], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"},
"rabbit_common": {:hex, :rabbit_common, "3.6.12", "28dfb9ce4decf64caa9d9e70977ab57af983bc2795fd9db349bfc254a8b45235", [], [], "hexpm"},
"ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm"},
"rsa": {:hex, :rsa, "0.0.1", "a63069f88ce342ffdf8448b7cdef4b39ba7dee3c1510644a39385c7e63ba246f", [], [], "hexpm"},
"sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [], [], "hexpm"},