Add new system to track dependency statuses globally
parent
1c9b8dbd7c
commit
d34ab2abb2
|
@ -16,6 +16,7 @@ defmodule FarmbotCore do
|
|||
def init([]) do
|
||||
|
||||
children = [
|
||||
FarmbotCore.DepTracker,
|
||||
FarmbotCore.Leds,
|
||||
FarmbotCore.EctoMigrator,
|
||||
FarmbotCore.BotState.Supervisor,
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
defmodule FarmbotCore.DepTracker do
|
||||
@moduledoc """
|
||||
Subscribe to internal dependency and service status events.
|
||||
"""
|
||||
alias FarmbotCore.{DepTracker, DepTracker.Table}
|
||||
|
||||
@doc "Start a dep tracker instance"
|
||||
def start_link(options) do
|
||||
name = Keyword.get(options, :name, DepTracker)
|
||||
|
||||
unless !is_nil(name) and is_atom(name) do
|
||||
raise ArgumentError, "expected :name to be given and to be an atom, got: #{inspect(name)}"
|
||||
end
|
||||
DepTracker.Supervisor.start_link(name)
|
||||
end
|
||||
|
||||
@doc false
|
||||
def child_spec(opts) do
|
||||
%{
|
||||
id: Keyword.get(opts, :name, DepTracker),
|
||||
start: {DepTracker, :start_link, [opts]},
|
||||
type: :supervisor
|
||||
}
|
||||
end
|
||||
|
||||
@doc "register an asset in the tracker"
|
||||
def register_asset(table \\ DepTracker, %kind{local_id: local_id}, status) do
|
||||
Table.put(table, {{kind, local_id}, status})
|
||||
end
|
||||
|
||||
@doc "register a service in the tracker"
|
||||
def register_service(table \\ DepTracker, service_name, status) do
|
||||
Table.put(table, {service_name, status})
|
||||
end
|
||||
|
||||
@doc """
|
||||
subscribe to asset changes from the tracker
|
||||
messages are dispatched in the shape of
|
||||
|
||||
{table_name, {kind, local_id}, status}
|
||||
"""
|
||||
def subscribe_asset(table \\ DepTracker, kind) do
|
||||
:ok = do_subscribe(table, kind)
|
||||
initial = get_asset(table, kind)
|
||||
for {{kind, local_id}, status} <- initial do
|
||||
send self(), {table, {kind, local_id}, nil, status}
|
||||
end
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc "get all current assets by kind"
|
||||
def get_asset(table \\ DepTracker, kind) do
|
||||
Table.get(table, {kind, :"$1"})
|
||||
end
|
||||
|
||||
@doc """
|
||||
subscribe to service changes from the tracker
|
||||
messages are dispatched in the shape of
|
||||
|
||||
{table_name, service_name, status}
|
||||
"""
|
||||
def subscribe_service(table \\ DepTracker, service_name) do
|
||||
:ok = do_subscribe(table, service_name)
|
||||
initial = get_service(table, service_name)
|
||||
for {^service_name, status} <- initial do
|
||||
send self(), {table, {service_name, nil, status}}
|
||||
end
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc "get all current services by name"
|
||||
def get_service(table \\ DepTracker, service_name) do
|
||||
Table.get(table, service_name)
|
||||
end
|
||||
|
||||
defp do_subscribe(table, name) do
|
||||
registry = DepTracker.Supervisor.registry_name(table)
|
||||
{:ok, _} = Registry.register(registry, name, nil)
|
||||
:ok
|
||||
end
|
||||
end
|
|
@ -0,0 +1,52 @@
|
|||
defmodule FarmbotCore.DepTracker.Logger do
|
||||
alias FarmbotCore.DepTracker
|
||||
require Logger
|
||||
use GenServer
|
||||
|
||||
@doc false
|
||||
def child_spec(args) do
|
||||
%{
|
||||
id: name(args),
|
||||
start: {FarmbotCore.DepTracker.Logger, :start_link, [args]},
|
||||
}
|
||||
end
|
||||
|
||||
def start_link(args) do
|
||||
GenServer.start_link(__MODULE__, args, [name: name(args)])
|
||||
end
|
||||
|
||||
defp name({table, [service: service_name]}) do
|
||||
Module.concat([__MODULE__, table, service_name])
|
||||
end
|
||||
|
||||
defp name({table, [asset: kind]}) do
|
||||
Module.concat([__MODULE__, table, kind])
|
||||
end
|
||||
|
||||
def init({table, [service: service_name]}) do
|
||||
:ok = DepTracker.subscribe_service(table, service_name)
|
||||
{:ok, %{service: service_name, table: table}}
|
||||
end
|
||||
|
||||
def init({table, [asset: kind]}) do
|
||||
:ok = DepTracker.subscribe_asset(table, kind)
|
||||
{:ok, %{asset: kind, table: table}}
|
||||
end
|
||||
|
||||
def handle_info({table, {kind, local_id}, old_status, new_status}, %{asset: kind, table: table} = state) do
|
||||
Logger.info """
|
||||
#{inspect(table)} asset status change:
|
||||
#{kind} local_id = #{local_id}
|
||||
#{kind} #{inspect(old_status)} => #{inspect(new_status)}
|
||||
"""
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({table, service, old_status, new_status}, %{service: service, table: table} = state) do
|
||||
Logger.info """
|
||||
#{inspect(table)} service status change:
|
||||
#{service} #{inspect(old_status)} => #{inspect(new_status)}
|
||||
"""
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
|
@ -0,0 +1,29 @@
|
|||
defmodule FarmbotCore.DepTracker.Supervisor do
|
||||
use Supervisor
|
||||
|
||||
alias FarmbotCore.DepTracker
|
||||
|
||||
@moduledoc false
|
||||
|
||||
def start_link(name) do
|
||||
Supervisor.start_link(__MODULE__, name)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(name) do
|
||||
registry_name = registry_name(name)
|
||||
|
||||
children = [
|
||||
{DepTracker.Table, {name, registry_name}},
|
||||
{Registry, [keys: :duplicate, name: registry_name]},
|
||||
# {DepTracker.Logger, {name, service: :firmware}},
|
||||
{DepTracker.Logger, {name, asset: FarmbotCore.Asset.FbosConfig}},
|
||||
]
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def registry_name(name) do
|
||||
Module.concat(DepTracker.Registry, name)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,69 @@
|
|||
defmodule FarmbotCore.DepTracker.Table do
|
||||
use GenServer
|
||||
|
||||
@doc false
|
||||
def start_link({table, _registry_name} = args) do
|
||||
GenServer.start_link(__MODULE__, args, name: table)
|
||||
end
|
||||
|
||||
@doc "put data"
|
||||
def put(table, {identifier, status}) do
|
||||
GenServer.call(table, {:put, identifier, status})
|
||||
end
|
||||
|
||||
@doc "get data"
|
||||
def get(table, {kind, _} = identifier) do
|
||||
:ets.match(table, {identifier, :"$2"})
|
||||
|> Enum.map(fn
|
||||
[local_id, status] -> {{kind, local_id}, status}
|
||||
other -> raise("unknown data in ets table: #{table} data: #{inspect(other)}")
|
||||
end)
|
||||
end
|
||||
|
||||
def get(table, service_name) do
|
||||
:ets.match(table, {service_name, :"$2"})
|
||||
|> Enum.map(fn
|
||||
[status] -> {service_name, status}
|
||||
other -> raise("unknown data in ets table: #{table} data: #{inspect(other)}")
|
||||
end)
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def init({table, registry_name}) do
|
||||
^table = :ets.new(table, [:named_table, read_concurrency: true])
|
||||
|
||||
state = %{table: table, registry: registry_name}
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_call({:put, identifier, status}, _from, state) do
|
||||
case :ets.lookup(state.table, identifier) do
|
||||
[{^identifier, ^status}] ->
|
||||
# No change, so no notifications
|
||||
:ok
|
||||
|
||||
[{^identifier, old_status}] ->
|
||||
:ets.insert(state.table, {identifier, status})
|
||||
dispatch(state, identifier, old_status, status)
|
||||
|
||||
[] ->
|
||||
:ets.insert(state.table, {identifier, status})
|
||||
dispatch(state, identifier, nil, status)
|
||||
end
|
||||
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
|
||||
defp dispatch(state, identifier, old, new) do
|
||||
kind = case identifier do
|
||||
{kind, _} -> kind
|
||||
kind -> kind
|
||||
end
|
||||
Registry.dispatch(state.registry, kind, fn entries ->
|
||||
message = {state.table, identifier, old, new}
|
||||
for {pid, _} <- entries, do: send(pid, message)
|
||||
end)
|
||||
:ok
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue