fix amqp transport with new sync_cmd.
This commit is contained in:
parent
34c6b05fe2
commit
6a2c3ebc26
|
@ -20,8 +20,8 @@ config :farmbot, :init, [
|
|||
|
||||
# Transports.
|
||||
config :farmbot, :transport, [
|
||||
Farmbot.BotState.Transport.GenMQTT,
|
||||
# Farmbot.BotState.Transport.AMQP,
|
||||
# Farmbot.BotState.Transport.GenMQTT,
|
||||
Farmbot.BotState.Transport.AMQP,
|
||||
Farmbot.BotState.Transport.HTTP,
|
||||
]
|
||||
|
||||
|
|
|
@ -28,10 +28,10 @@ defmodule Farmbot.BotState.Transport.AMQP do
|
|||
{:ok, conn} <- AMQP.Connection.open([host: mqtt_server, username: device, password: token, virtual_host: vhost || "/"]),
|
||||
{:ok, chan} <- AMQP.Channel.open(conn),
|
||||
queue_name <- Enum.join([device, UUID.uuid1()], "-"),
|
||||
:ok <- Basic.qos(chan, []),
|
||||
{:ok, _} <- AMQP.Queue.declare(chan, queue_name, [auto_delete: true]),
|
||||
:ok <- AMQP.Queue.bind(chan, queue_name, @exchange, [routing_key: "bot.#{device}.from_clients"]),
|
||||
:ok <- AMQP.Queue.bind(chan, queue_name, @exchange, [routing_key: "bot.#{device}.sync.#"]),
|
||||
:ok <- Basic.qos(chan, prefetch_count: 10),
|
||||
{:ok, _tag} <- Basic.consume(chan, queue_name),
|
||||
state <- struct(State, [conn: conn, chan: chan, queue_name: queue_name, bot: device])
|
||||
do
|
||||
|
@ -129,13 +129,14 @@ defmodule Farmbot.BotState.Transport.AMQP do
|
|||
defp handle_sync_cmd(kind, id, payload, state) do
|
||||
mod = Module.concat(["Farmbot", "Repo", kind])
|
||||
if Code.ensure_loaded?(mod) do
|
||||
body = struct(mod)
|
||||
sync_cmd = Poison.decode!(payload, as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
|
||||
Farmbot.Repo.register_sync_cmd(sync_cmd)
|
||||
%{"body" => body, "args" => %{"label" => uuid}} = Poison.decode!(payload, as: %{"body" => struct(mod)})
|
||||
Farmbot.Repo.register_sync_cmd(String.to_integer(id), kind, body)
|
||||
|
||||
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
|
||||
Farmbot.Repo.flip()
|
||||
end
|
||||
|
||||
Farmbot.CeleryScript.AST.Node.RpcOk.execute(%{label: uuid}, [], struct(Macro.Env))
|
||||
else
|
||||
Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(payload)}"
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue