Revert multiple state channels over amqp
parent
b619cb3576
commit
3ff7a78f68
|
@ -1,19 +1,6 @@
|
|||
defmodule FarmbotExt.AMQP.BotStateNGChannel do
|
||||
@moduledoc """
|
||||
Publishes JSON encoded bot state fragements onto an AMQP channel
|
||||
Examples:
|
||||
|
||||
if the state looks something like:
|
||||
```
|
||||
%{location_data: %{position: %{x: 1.0, y: 0.0, z: 4.0}}}
|
||||
```
|
||||
It would publish the following data:
|
||||
* `bot/<device_id>/status_v8/location_data.position.x` => `"1.0"`
|
||||
* `bot/<device_id>/status_v8/location_data.position.y` => `"0.0"`
|
||||
* `bot/<device_id>/status_v8/location_data_.position.z` => `"4.0"`
|
||||
|
||||
One could subscribe to `bot/<device_id>/status_v8/location_data/#`
|
||||
and recieve all of those notifications.
|
||||
Publishes JSON encoded bot state updates onto an AMQP channel
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
|
@ -25,13 +12,10 @@ defmodule FarmbotExt.AMQP.BotStateNGChannel do
|
|||
alias FarmbotExt.AMQP.ConnectionWorker
|
||||
|
||||
alias FarmbotCore.{BotState, BotStateNG}
|
||||
alias FarmbotCore.BotStateNG.ChangeGenerator
|
||||
|
||||
# Pushes a state tree every 5 seconds for good luck.
|
||||
@default_error_retry_ms 100
|
||||
@exchange "amq.topic"
|
||||
|
||||
defstruct [:conn, :chan, :jwt, :changes]
|
||||
defstruct [:conn, :chan, :jwt, :cache]
|
||||
alias __MODULE__, as: State
|
||||
|
||||
@doc "Forces pushing the most current state tree"
|
||||
|
@ -44,37 +28,33 @@ defmodule FarmbotExt.AMQP.BotStateNGChannel do
|
|||
GenServer.start_link(__MODULE__, args, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def init(args) do
|
||||
jwt = Keyword.fetch!(args, :jwt)
|
||||
Process.flag(:sensitive, true)
|
||||
changes = BotState.subscribe() |> ChangeGenerator.changes()
|
||||
{:ok, %State{conn: nil, chan: nil, jwt: jwt, changes: changes}, 0}
|
||||
cache = BotState.subscribe()
|
||||
{:ok, %State{conn: nil, chan: nil, jwt: jwt, cache: cache}, 0}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def terminate(reason, state) do
|
||||
FarmbotCore.Logger.error(1, "Disconnected from BotState channel: #{inspect(reason)}")
|
||||
# If a channel was still open, close it.
|
||||
if state.chan, do: Channel.close(state.chan)
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_cast(:force, state) do
|
||||
bot_state = BotState.fetch()
|
||||
changes = ChangeGenerator.changes(bot_state)
|
||||
|
||||
json =
|
||||
bot_state
|
||||
|> BotStateNG.view()
|
||||
|> JSON.encode!()
|
||||
|
||||
Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.status", json)
|
||||
{:noreply, %{state | changes: changes}, 0}
|
||||
cache = BotState.fetch()
|
||||
{:noreply, %{state | cache: cache}, {:continue, :dispatch}}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info(:timeout, %{conn: nil, chan: nil} = state) do
|
||||
with %{} = conn <- ConnectionWorker.connection(),
|
||||
{:ok, chan} <- Channel.open(conn),
|
||||
:ok <- Basic.qos(chan, global: true) do
|
||||
{:noreply, %{state | conn: conn, chan: chan}, 0}
|
||||
{:noreply, %{state | conn: conn, chan: chan}, {:continue, :dispatch}}
|
||||
else
|
||||
nil ->
|
||||
{:noreply, %{state | conn: nil, chan: nil}, 5000}
|
||||
|
@ -86,36 +66,26 @@ defmodule FarmbotExt.AMQP.BotStateNGChannel do
|
|||
end
|
||||
|
||||
def handle_info(:timeout, %{chan: %{}} = state) do
|
||||
{:noreply, %{state | changes: []}, {:continue, state.changes}}
|
||||
{:noreply, state, {:continue, :dispatch}}
|
||||
end
|
||||
|
||||
def handle_info({BotState, change}, state) do
|
||||
changes = (state.changes ++ change.changes) |> ChangeGenerator.changes()
|
||||
{:noreply, %{state | changes: changes}, 0}
|
||||
cache = Ecto.Changeset.apply_changes(change)
|
||||
{:noreply, %{state | cache: cache}, {:continue, :dispatch}}
|
||||
end
|
||||
|
||||
def handle_continue([{path, value} | rest] = changes, %{chan: chan} = state) do
|
||||
path =
|
||||
path
|
||||
|> Enum.map(&to_string/1)
|
||||
|> Enum.join(".")
|
||||
|
||||
json = JSON.encode!(value)
|
||||
|
||||
case Basic.publish(chan, @exchange, "bot.#{state.jwt.bot}.status_v8.upsert.#{path}", json) do
|
||||
:ok ->
|
||||
{:noreply, state, {:continue, rest}}
|
||||
|
||||
error ->
|
||||
msg = """
|
||||
Failed to send state value: #{path}, #{inspect(value)}
|
||||
error: #{inspect(error)}
|
||||
"""
|
||||
|
||||
FarmbotCore.Logger.error(1, msg)
|
||||
{:noreply, %{state | changes: changes}, @default_error_retry_ms}
|
||||
end
|
||||
@impl GenServer
|
||||
def handle_continue(:dispatch, %{chan: nil} = state) do
|
||||
{:noreply, state, 5000}
|
||||
end
|
||||
|
||||
def handle_continue([], state), do: {:noreply, state}
|
||||
def handle_continue(:dispatch, %{chan: %{}, cache: cache} = state) do
|
||||
json =
|
||||
cache
|
||||
|> BotStateNG.view()
|
||||
|> JSON.encode!()
|
||||
|
||||
Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.status", json)
|
||||
{:noreply, state, 5000}
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue