Simplify API, store messages in DETS

pull/1045/head
Connor Rigby 2019-10-23 11:30:29 -07:00 committed by Connor Rigby
parent e58d33243a
commit f845ddf38a
4 changed files with 175 additions and 118 deletions

View File

@ -8,7 +8,8 @@ defmodule FarmbotExt.AMQP.ChannelSupervisor do
PingPongChannel,
BotStateChannel,
AutoSyncChannel,
CeleryScriptChannel
CeleryScriptChannel,
TelemetryChannel
}
def start_link(args) do
@ -20,6 +21,7 @@ defmodule FarmbotExt.AMQP.ChannelSupervisor do
jwt = JWT.decode!(token)
children = [
{TelemetryChannel, [jwt: jwt]},
{LogChannel, [jwt: jwt]},
{PingPongChannel, [jwt: jwt]},
{BotStateChannel, [jwt: jwt]},

View File

@ -0,0 +1,84 @@
defmodule FarmbotExt.AMQP.TelemetryChannel do
@moduledoc """
"""
use GenServer
use AMQP
alias FarmbotExt.AMQP.ConnectionWorker
require FarmbotCore.Logger
@exchange "amq.topic"
defstruct [:conn, :chan, :jwt]
alias __MODULE__, as: State
@doc false
def start_link(args, opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, args, opts)
end
def init(args) do
Process.flag(:sensitive, true)
jwt = Keyword.fetch!(args, :jwt)
send(self(), :connect_amqp)
state = %State{
conn: nil,
chan: nil,
jwt: jwt
}
{:ok, state}
end
def terminate(reason, state) do
FarmbotCore.Logger.error(1, "Disconnected from Telemetry channel: #{inspect(reason)}")
if state.chan, do: ConnectionWorker.close_channel(state.chan)
end
def handle_info(:connect_amqp, state) do
bot = state.jwt.bot
telemetry = bot <> "_telemetry"
# route = "bot.#{bot}.telemetry"
with %{} = conn <- ConnectionWorker.connection(),
{:ok, %{pid: channel_pid} = chan} <- Channel.open(conn),
Process.link(channel_pid),
:ok <- Basic.qos(chan, global: true),
{:ok, _} <- Queue.declare(chan, telemetry, auto_delete: true),
{:ok, _} <- Queue.purge(chan, telemetry) do
FarmbotCore.Logger.debug(3, "connected to Telemetry channel")
send(self(), :consume_telemetry)
{:noreply, %{state | conn: conn, chan: chan}}
else
nil ->
Process.send_after(self(), :connect_amqp, 5000)
{:noreply, %{state | conn: nil, chan: nil}}
err ->
FarmbotCore.Logger.error(1, "Failed to connect to Telemetry channel: #{inspect(err)}")
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end
end
def handle_info(:consume_telemetry, state) do
_ =
FarmbotTelemetry.consume_telemetry(fn
{captured_at, kind, subsystem, measurement, value, meta} ->
json =
FarmbotCore.JSON.encode!(%{
measurement => value,
:kind => kind,
:subsystem => subsystem,
:captured_at => to_string(captured_at),
:meta => %{meta | function: inspect(meta.function)}
})
Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.telemetry", json)
end)
_ = Process.send_after(self(), :consume_telemetry, 1000)
{:noreply, state}
end
end

View File

@ -3,152 +3,113 @@ defmodule FarmbotTelemetry do
Interface for farmbot system introspection and metrics
"""
@typedoc "Classification of telemetry event"
@type class() :: event_class() | metric_class()
@typedoc "Type of telemetry data"
@type kind() :: :event | :metric
@typedoc "Event classes are events that have no measurable value"
@type event_class() :: atom()
@typedoc "Classifier for telemetry data"
@type subsystem() :: atom()
@typedoc "Metric classes are events that have a measurable value"
@type metric_class() :: atom()
@typedoc "Name of subsystem measurement data"
@type measurement() :: atom()
@typedoc "Type within an event"
@type event_type() :: atom()
@typedoc "Value of subsystem measurement data"
@type value() :: number()
@typedoc "Action withing a type"
@type event_action() :: atom()
@typedoc "Metadata for a telemetry event"
@type meta() :: map()
@typedoc "Value of a metric event"
@type metric_value() :: term()
@typedoc """
1st arg passed to a `handler` if the type was an event
[:farmbot_telemetry, :event, t/event_class()]
"""
@type event_class_path() :: [atom()]
@typedoc "2nd arg passed to a `handler` if the type was an event"
@type event_class_data() :: %{
required(:type) => event_type(),
required(:action) => event_action(),
required(:timestamp) => DateTime.t()
}
@typedoc """
1st arg passed to a `handler` if the type was a metric
[:farmbot_telemetry, :metric, t/metric_class()]
"""
@type metric_class_path() :: [atom()]
@typedoc "2nd arg passed to a `handler` if the type was a metric"
@type metric_class_data() :: %{
required(:value) => metric_value(),
required(:timestamp) => DateTime.t()
}
@typedoc "3rd arg passed to a `handler`"
@type meta() :: %{
required(:module) => module() | nil,
required(:file) => Path.t() | nil,
required(:line) => pos_integer() | nil,
required(:function) => {atom, 0 | pos_integer()} | nil
}
@typedoc "4th arg passed to a `handler`"
@type config() :: term()
@typedoc "Function that handles telemetry data"
@type handler() ::
(event_class_path(), event_class_data(), meta(), config() -> any())
| (metric_class_path(), metric_class_data(), meta(), config() -> any())
@doc "Merges environment data with existing metadata"
@spec telemetry_meta(Macro.Env.t(), map()) :: meta()
def telemetry_meta(env, meta) do
Map.merge(meta, %{
module: env.module,
file: env.file,
line: env.line,
function: env.function
})
end
@doc "Execute a telemetry event"
defmacro event(class, type, action, meta \\ %{}) do
meta =
Map.merge(meta, %{
module: __ENV__.module,
file: __ENV__.file,
line: __ENV__.line,
function: __ENV__.function
})
defmacro event(subsystem, measurement, value, meta \\ %{}) do
quote location: :keep do
:telemetry.execute(
[:farmbot_telemetry, :event, unquote(class)],
%{type: unquote(type), action: unquote(action), timestamp: DateTime.utc_now()},
unquote(Macro.escape(meta))
FarmbotTelemetry.bare_telemetry(
:event,
unquote(subsystem),
unquote(measurement),
unquote(value),
DateTime.utc_now(),
FarmbotTelemetry.telemetry_meta(__ENV__, unquote(Macro.escape(meta)))
)
end
end
@doc "Execute a telemetry metric"
defmacro metric(class, value, meta \\ %{}) do
meta =
Map.merge(meta, %{
module: __ENV__.module,
file: __ENV__.file,
line: __ENV__.line,
function: __ENV__.function
})
defmacro metric(subsystem, measurement, value, meta \\ %{}) do
quote location: :keep do
:telemetry.execute(
[:farmbot_telemetry, :metric, unquote(class)],
%{value: unquote(value), timestamp: DateTime.utc_now()},
unquote(Macro.escape(meta))
FarmbotTelemetry.bare_telemetry(
:metric,
unquote(subsystem),
unquote(measurement),
unquote(value),
DateTime.utc_now(),
FarmbotTelemetry.telemetry_meta(__ENV__, unquote(Macro.escape(meta)))
)
end
end
@doc "Attach a handler to an event"
@spec attach(String.t(), event_class_path() | metric_class_path(), handler(), config()) :: any()
def attach(handler_id, event, handler, config \\ []) do
:telemetry.attach(handler_id, event, handler, config)
@doc "Function responsible for firing telemetry events"
@spec bare_telemetry(kind(), subsystem(), measurement(), value(), DateTime.t(), meta()) :: :ok
def bare_telemetry(kind, subsystem, measurement, value, captured_at, meta) do
_ =
:telemetry.execute(
[:farmbot_telemetry, kind, subsystem],
%{measurement => value, captured_at: captured_at},
meta
)
_ = :dets.insert(:farmbot_telemetry, {captured_at, kind, subsystem, measurement, value, meta})
end
@doc "Helper to attach the log handler to an event"
@spec attach_logger(event_class_path() | metric_class_path(), config()) :: any()
def attach_logger(event, config \\ [level: :info]) do
attach(
"logger.#{:erlang.phash2({node(), :erlang.now()})}",
event,
@doc "Attach a logger to a kind and subsystem"
def attach_logger(kind, subsystem, config \\ []) do
:telemetry.attach(
"farmbot-telemetry-logger-#{kind}-#{subsystem}",
[:farmbot_telemetry, kind, subsystem],
&FarmbotTelemetry.log_handler/4,
config
)
end
@doc "Helper to send a message to the current processes when a matching event is dispatched"
@spec attach_recv(event_class_path() | metric_class_path(), config()) :: any()
def attach_recv(event, config \\ [pid: self()]) do
attach(
"recv.#{:erlang.phash2({node(), :erlang.now()})}",
event,
&Kernel.send(&4[:pid], {&1, &2, &3, &4}),
config
@doc false
def log_handler(event, measurements, meta, config) do
Logger.bare_log(
config[:level] || :info,
"#{inspect(event)} | #{inspect(measurements)}",
Map.to_list(meta)
)
end
@doc false
def log_handler(event, data, meta, config) do
msg =
case event do
[:farmbot_telemetry, :event, class] ->
%{type: type, action: action} = data
"#{class}.#{type}.#{action}"
@typedoc "Function passed to `consume_telemetry/1`"
@type consumer_fun() ::
({DateTime.t(), kind(), subsystem(), measurement(), value(), meta()} -> :ok | :error)
[:farmbot_telemetry, :metric, class] ->
%{value: value} = data
"#{class}.#{value}=#{value}"
end
@doc "Consume telemetry events"
def consume_telemetry(fun) do
all_events = :dets.match_object(:farmbot_telemetry, :_)
Logger.bare_log(config[:level] || :debug, msg, Map.to_list(meta))
tasks =
Enum.map(all_events, fn event ->
{elem(event, 0), Task.async(fn -> fun.(event) end)}
end)
_ =
Enum.map(tasks, fn {created_at, task} ->
case Task.await(task) do
:ok -> :dets.delete(:farmbot_telemetry, created_at)
_ -> :ok
end
end)
:ok
end
@doc "Helper to generate a path for event names"
@spec event_class(event_class()) :: event_class_path()
def event_class(class), do: [:farmbot_telemetry, :event, class]
@doc "Helper to generate a path for metric names"
@spec metric_class(metric_class()) :: metric_class_path()
def metric_class(class), do: [:farmbot_telemetry, :metric, class]
end

View File

@ -5,7 +5,17 @@ defmodule FarmbotTelemetry.Application do
use Application
def config do
user_defined = Application.get_all_env(:farmbot_telemetry)
Keyword.merge(
[access: :read_write, type: :set, file: '/tmp/farmbot_telemetry.dets'],
user_defined
)
end
def start(_type, _args) do
{:ok, :farmbot_telemetry} = :dets.open_file(:farmbot_telemetry, config())
children = []
# See https://hexdocs.pm/elixir/Supervisor.html