Add telemetry to the rest of the app

More things will be added in the future.
Metrics are collected from bot state because
the amount of messages sent by every bot would
make datadog charge us a lot of money
pull/1045/head
Connor Rigby 2019-10-24 10:37:17 -07:00 committed by Connor Rigby
parent 49c60bdb88
commit ec56ebaf74
18 changed files with 142 additions and 15 deletions

View File

@ -1,5 +1,6 @@
defmodule FarmbotCore.BotState.SchedulerUsageReporter do
alias FarmbotCore.BotState
require FarmbotTelemetry
use GenServer
@default_timeout_ms 5000

View File

@ -14,6 +14,7 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
# The API dispatches messages for other resources, but these
# are the only ones that Farmbot needs to sync.
@ -96,6 +97,7 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
BotState.set_sync_status("sync_error")
_ = Leds.green(:slow_blink)
FarmbotCore.Logger.error(1, "Error preloading. #{inspect(reason)}")
FarmbotTelemetry.event(:asset_sync, :preload_error, nil, error: inspect(reason))
Process.send_after(self(), :preload, 5000)
{:noreply, state}
end

View File

@ -8,6 +8,7 @@ defmodule FarmbotExt.AMQP.BotStateChannel do
alias AMQP.Channel
require FarmbotCore.Logger
require FarmbotTelemetry
alias FarmbotCore.JSON
alias FarmbotExt.AMQP.ConnectionWorker
@ -54,12 +55,14 @@ defmodule FarmbotExt.AMQP.BotStateChannel do
with %{} = conn <- ConnectionWorker.connection(),
{:ok, chan} <- Channel.open(conn),
:ok <- Basic.qos(chan, global: true) do
FarmbotTelemetry.event(:amqp, :channel_open)
{:noreply, %{state | conn: conn, chan: chan}, {:continue, :dispatch}}
else
nil ->
{:noreply, %{state | conn: nil, chan: nil}, 5000}
err ->
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
FarmbotCore.Logger.error(1, "Failed to connect to BotState channel: #{inspect(err)}")
{:noreply, %{state | conn: nil, chan: nil}, 1000}
end

View File

@ -8,6 +8,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
alias FarmbotCore.JSON
require FarmbotCore.Logger
require FarmbotTelemetry
require Logger
alias FarmbotCeleryScript.{AST, StepRunner}
@ -50,6 +51,8 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
:ok <- Queue.bind(chan, queue_name, @exchange, routing_key: route),
{:ok, _tag} <- Basic.consume(chan, queue_name, self(), no_ack: true) do
FarmbotCore.Logger.debug(3, "connected to CeleryScript channel")
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotTelemetry.event(:amqp, :queue_bind, nil, queue_name: queue_name, routing_key: route)
{:noreply, %{state | conn: conn, chan: chan}}
else
nil ->
@ -58,6 +61,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
err ->
FarmbotCore.Logger.error(1, "Failed to connect to CeleryScript channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end

View File

@ -6,6 +6,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
use GenServer
require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
alias AMQP.{Basic, Channel, Queue}
alias FarmbotExt.{JWT, AMQP.ConnectionWorker}
@ -55,10 +56,17 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
{:ok, _} <- Basic.consume(chan, chan_name, self(), no_ack: true) do
Process.link(conn.pid)
Process.link(chan.pid)
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotTelemetry.event(:amqp, :queue_bind, nil, queue_name: chan_name, routing_key: route)
%{conn: conn, chan: chan}
else
nil -> %{conn: nil, chan: nil}
error -> error
nil ->
%{conn: nil, chan: nil}
error ->
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(error))
error
end
end
@ -107,11 +115,13 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
case open_connection(token, email, jwt.bot, jwt.mqtt, jwt.vhost) do
{:ok, conn} ->
FarmbotTelemetry.event(:amqp, :connection_open)
Process.monitor(conn.pid)
{:noreply, %{state | conn: conn}}
err ->
Logger.error("Error Opening AMQP connection: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :connection_open_error, nil, error: inspect(err))
Process.send_after(self(), :timeout, 5000)
{:noreply, %{state | conn: nil}}
end
@ -120,6 +130,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
FarmbotCore.Logger.error(2, "AMQP Connection exit")
_ = close_connection(state.conn)
FarmbotTelemetry.event(:amqp, :connection_close)
{:stop, reason, state}
end
@ -131,6 +142,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
def handle_call(:close, _from, %{conn: _conn} = state) do
FarmbotCore.Logger.error(2, "AMQP Connection closing")
reply = close_connection(state.conn)
FarmbotTelemetry.event(:amqp, :connection_close)
{:stop, :close, reply, %{state | conn: nil}}
end

View File

@ -9,6 +9,7 @@ defmodule FarmbotExt.AMQP.LogChannel do
alias FarmbotCore.{BotState, JSON}
require FarmbotCore.Logger
require FarmbotTelemetry
alias FarmbotExt.AMQP.ConnectionWorker
require Logger
@ -40,6 +41,7 @@ defmodule FarmbotExt.AMQP.LogChannel do
with %{} = conn <- ConnectionWorker.connection(),
{:ok, chan} <- Channel.open(conn),
:ok <- Basic.qos(chan, global: true) do
FarmbotTelemetry.event(:amqp, :channel_open)
initial_bot_state = BotState.subscribe()
{:noreply, %{state | conn: conn, chan: chan, state_cache: initial_bot_state}, 0}
else
@ -48,6 +50,7 @@ defmodule FarmbotExt.AMQP.LogChannel do
err ->
FarmbotCore.Logger.error(1, "Failed to connect to Log channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
{:noreply, %{state | conn: nil, chan: nil, state_cache: nil}, 1000}
end
end

View File

@ -12,6 +12,7 @@ defmodule FarmbotExt.AMQP.PingPongChannel do
require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
alias FarmbotCore.Leds
@exchange "amq.topic"
@ -66,6 +67,10 @@ defmodule FarmbotExt.AMQP.PingPongChannel do
{:ok, _} <- Queue.purge(chan, ping),
:ok <- Queue.bind(chan, ping, @exchange, routing_key: route <> ".#"),
{:ok, _tag} <- Basic.consume(chan, ping, self(), no_ack: true) do
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotTelemetry.event(:amqp, :queue_bind, nil, queue_name: ping, routing_key: route <> ".#")
FarmbotCore.Logger.debug(3, "connected to PingPong channel")
_ = Leds.blue(:solid)
{:noreply, %{state | conn: conn, chan: chan}}
@ -76,6 +81,7 @@ defmodule FarmbotExt.AMQP.PingPongChannel do
err ->
FarmbotCore.Logger.error(1, "Failed to connect to PingPong channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end

View File

@ -4,12 +4,16 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
use GenServer
use AMQP
alias FarmbotCore.{BotState, BotStateNG}
alias FarmbotExt.AMQP.ConnectionWorker
require FarmbotCore.Logger
require FarmbotTelemetry
@exchange "amq.topic"
@dispatch_metrics_timeout 300_000
@consume_telemetry_timeout 1000
defstruct [:conn, :chan, :jwt]
defstruct [:conn, :chan, :jwt, :cache]
alias __MODULE__, as: State
@doc false
@ -21,11 +25,13 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
Process.flag(:sensitive, true)
jwt = Keyword.fetch!(args, :jwt)
send(self(), :connect_amqp)
cache = BotState.subscribe()
state = %State{
conn: nil,
chan: nil,
jwt: jwt
jwt: jwt,
cache: cache
}
{:ok, state}
@ -47,8 +53,10 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
:ok <- Basic.qos(chan, global: true),
{:ok, _} <- Queue.declare(chan, telemetry, auto_delete: true),
{:ok, _} <- Queue.purge(chan, telemetry) do
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotCore.Logger.debug(3, "connected to Telemetry channel")
send(self(), :consume_telemetry)
send(self(), :dispatch_metrics)
{:noreply, %{state | conn: conn, chan: chan}}
else
nil ->
@ -57,11 +65,39 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
err ->
FarmbotCore.Logger.error(1, "Failed to connect to Telemetry channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end
end
def handle_info({BotState, change}, state) do
cache = Ecto.Changeset.apply_changes(change)
{:noreply, %{state | cache: cache}}
end
def handle_info(:dispatch_metrics, state) do
metrics = BotStateNG.view(state.cache).informational_settings
json =
FarmbotCore.JSON.encode!(%{
"telemetry_captured_at" => DateTime.utc_now(),
"telemetry_soc_temp" => metrics.soc_temp,
"telemetry_throttled" => metrics.throttled,
"telemetry_wifi_level" => metrics.wifi_level,
"telemetry_wifi_level_percent" => metrics.wifi_level_percent,
"telemetry_uptime" => metrics.uptime,
"telemetry_memory_usage" => metrics.memory_usage,
"telemetry_disk_usage" => metrics.disk_usage,
"telemetry_scheduler_usage" => metrics.scheduler_usage,
"telemetry_cpu_usage" => metrics.cpu_usage
})
Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.telemetry", json)
Process.send_after(self(), :dispatch_metrics, @dispatch_metrics_timeout)
{:noreply, state}
end
def handle_info(:consume_telemetry, state) do
_ =
FarmbotTelemetry.consume_telemetry(fn
@ -80,7 +116,7 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.telemetry", json)
end)
_ = Process.send_after(self(), :consume_telemetry, 1000)
_ = Process.send_after(self(), :consume_telemetry, @consume_telemetry_timeout)
{:noreply, state}
end
end

View File

@ -90,6 +90,8 @@ config :farmbot_core, FarmbotCore.Asset.Repo,
pool_size: 1,
database: Path.join(data_path, "asset-prod.sqlite3")
config :farmbot_telemetry, file: to_charlist(Path.join(data_path, 'farmbot-telemetry.dets'))
config :farmbot, FarmbotOS.Platform.Supervisor,
platform_children: [
FarmbotOS.Platform.Target.NervesHubClient,

View File

@ -90,6 +90,8 @@ config :farmbot_core, FarmbotCore.Asset.Repo,
pool_size: 1,
database: Path.join(data_path, "asset-#{Mix.env()}.sqlite3")
config :farmbot_telemetry, file: to_charlist(Path.join(data_path, 'farmbot-telemetry.dets'))
config :farmbot, FarmbotOS.Platform.Supervisor,
platform_children: [
FarmbotOS.Platform.Target.NervesHubClient,

View File

@ -1,6 +1,7 @@
defmodule FarmbotOS.Configurator.Router do
@moduledoc "Routes web connections for configuring farmbot os"
require FarmbotCore.Logger
require FarmbotTelemetry
import Phoenix.HTML
use Plug.Router
@ -40,6 +41,8 @@ defmodule FarmbotOS.Configurator.Router do
end
get "/" do
FarmbotTelemetry.event(:configurator, :configuration_start)
case load_last_reset_reason() do
reason when is_binary(reason) ->
if String.contains?(reason, "CeleryScript request.") do

View File

@ -1,5 +1,6 @@
defmodule FarmbotOS.SysCalls do
require FarmbotCore.Logger
require FarmbotTelemetry
require Logger
alias FarmbotCeleryScript.AST
@ -264,6 +265,7 @@ defmodule FarmbotOS.SysCalls do
:ok
else
error ->
FarmbotTelemetry.event(:asset_sync, :sync_error, nil, error: inspect(error))
:ok = BotState.set_sync_status("sync_error")
_ = Leds.green(:slow_blink)
{:error, inspect(error)}

View File

@ -1,7 +1,8 @@
defmodule FarmbotOS.SysCalls.DumpInfo do
@moduledoc false
require FarmbotCore.Logger
alias FarmbotCore.{Asset, Asset.Private, Config, Project}
require FarmbotTelemetry
alias FarmbotCore.{Asset, Asset.DiagnosticDump, Asset.Private, Config, Project}
def dump_info do
FarmbotCore.Logger.busy(1, "Recording diagnostic dump.")
@ -28,9 +29,15 @@ defmodule FarmbotOS.SysCalls.DumpInfo do
{:ok, diag} ->
_ = Private.mark_dirty!(diag, %{})
FarmbotCore.Logger.success(1, "Diagnostic dump recorded.")
FarmbotTelemetry.event(:diagnostic_dump, :record, nil,
diagnostic_dump: DiagnosticDump.render(diag)
)
:ok
{:error, changeset} ->
FarmbotTelemetry.event(:diagnostic_dump, :record_error, nil, error: inspect(changeset))
{:error, "error creating diagnostic dump: #{inspect(changeset)}"}
end
end

View File

@ -3,6 +3,7 @@ defmodule FarmbotOS.Platform.Target.Network do
use GenServer, shutdown: 10_000
require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
import FarmbotOS.Platform.Target.Network.Utils,
only: [
@ -123,6 +124,7 @@ defmodule FarmbotOS.Platform.Target.Network do
end
vintage_net_config = to_vintage_net(config)
FarmbotTelemetry.event(:network, :interface_configure, nil, interface: ifname)
configure_result = VintageNet.configure(config.name, vintage_net_config)
FarmbotCore.Logger.success(3, "#{config.name} setup: #{inspect(configure_result)}")
@ -137,12 +139,14 @@ defmodule FarmbotOS.Platform.Target.Network do
def handle_info({VintageNet, ["interface", ifname, "lower_up"], _old, false, _meta}, state) do
FarmbotCore.Logger.error(1, "Interface #{ifname} disconnected from access point")
FarmbotTelemetry.event(:network, :interface_disconnect, nil, interface: ifname)
state = start_network_not_found_timer(state)
{:noreply, state}
end
def handle_info({VintageNet, ["interface", ifname, "lower_up"], _old, true, _meta}, state) do
FarmbotCore.Logger.success(1, "Interface #{ifname} connected access point")
FarmbotTelemetry.event(:network, :interface_connect, nil, interface: ifname)
state = cancel_network_not_found_timer(state)
{:noreply, state}
end
@ -152,6 +156,7 @@ defmodule FarmbotOS.Platform.Target.Network do
state
) do
FarmbotCore.Logger.warn(1, "Interface #{ifname} connected to local area network")
FarmbotTelemetry.event(:network, :lan_connect, nil, interface: ifname)
{:noreply, state}
end
@ -161,6 +166,7 @@ defmodule FarmbotOS.Platform.Target.Network do
) do
FarmbotCore.Logger.warn(1, "Interface #{ifname} connected to internet")
state = cancel_network_not_found_timer(state)
FarmbotTelemetry.event(:network, :wan_connect, nil, interface: ifname)
{:noreply, %{state | first_connect?: false}}
end
@ -170,6 +176,7 @@ defmodule FarmbotOS.Platform.Target.Network do
) do
FarmbotCore.Logger.warn(1, "Interface #{ifname} disconnected from the internet: #{ifstate}")
FarmbotExt.AMQP.ConnectionWorker.close()
FarmbotTelemetry.event(:network, :wan_disconnect, nil, interface: ifname)
if state.network_not_found_timer do
{:noreply, state}
@ -270,6 +277,7 @@ defmodule FarmbotOS.Platform.Target.Network do
end
def reset_ntp do
FarmbotTelemetry.event(:ntp, :reset)
ntp_server_1 = Config.get_config_value(:string, "settings", "default_ntp_server_1")
ntp_server_2 = Config.get_config_value(:string, "settings", "default_ntp_server_2")

View File

@ -1,5 +1,6 @@
defmodule FarmbotOS.Platform.Target.ShoehornHandler do
use Shoehorn.Handler
require FarmbotTelemetry
require FarmbotCore.Logger
require Logger
@ -26,6 +27,8 @@ defmodule FarmbotOS.Platform.Target.ShoehornHandler do
"Farmbot app: #{app} exited #{count}: #{inspect(reason, limit: :infinity)}"
)
FarmbotTelemetry.event(:shoehorn, :application_exit, nil, application: app)
{:continue, %{state | restart_counts: count + 1}}
end
@ -36,6 +39,7 @@ defmodule FarmbotOS.Platform.Target.ShoehornHandler do
:farmbot
] do
error_log("Farmbot app: #{app} exited #{count}: #{inspect(reason, limit: :infinity)}")
FarmbotTelemetry.event(:shoehorn, :application_exit, nil, application: app)
with {:ok, _} <- Application.ensure_all_started(:farmbot_core),
{:ok, _} <- Application.ensure_all_started(:farmbot_ext),
@ -48,6 +52,7 @@ defmodule FarmbotOS.Platform.Target.ShoehornHandler do
def application_exited(app, reason, state) do
error_log("Application stopped: #{inspect(app)} #{inspect(reason, limit: :infinity)}")
FarmbotTelemetry.event(:shoehorn, :application_exit, nil, application: app)
# Application.ensure_all_started(app)
{:continue, state}
end

View File

@ -18,6 +18,7 @@ defmodule FarmbotOS.Platform.Target.Uevent do
use GenServer
require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
alias FarmbotCore.{Config, FirmwareTTYDetector}
def start_link(args) do
@ -58,6 +59,7 @@ defmodule FarmbotOS.Platform.Target.Uevent do
def new_tty(tty) do
case FirmwareTTYDetector.tty() do
nil ->
FarmbotTelemetry.event(:firmware, :tty_detected, nil, tty: tty)
FarmbotCore.Logger.busy(1, "new firmware interfaces detected: #{tty}")
FirmwareTTYDetector.set_tty(tty)
Config.update_config_value(:bool, "settings", "firmware_needs_flash", true)

View File

@ -19,7 +19,7 @@ defmodule FarmbotTelemetry do
@type value() :: number()
@typedoc "Metadata for a telemetry event"
@type meta() :: map()
@type meta() :: Keyword.t()
@doc "Merges environment data with existing metadata"
@spec telemetry_meta(Macro.Env.t(), map()) :: meta()
@ -33,9 +33,10 @@ defmodule FarmbotTelemetry do
end
@doc "Execute a telemetry event"
defmacro event(subsystem, measurement_or_event_name, value \\ nil, meta \\ %{})
defmacro event(subsystem, measurement_or_event_name, value \\ nil, meta \\ [])
defmacro event(subsystem, measurement, value, meta) do
defmacro event(subsystem, measurement, value, meta)
when is_atom(subsystem) and is_atom(measurement) and is_list(meta) do
quote location: :keep do
FarmbotTelemetry.bare_telemetry(
UUID.uuid4(),
@ -44,13 +45,26 @@ defmodule FarmbotTelemetry do
unquote(measurement),
unquote(value),
DateTime.utc_now(),
FarmbotTelemetry.telemetry_meta(__ENV__, unquote(Macro.escape(meta)))
Keyword.merge(unquote(meta),
module: __ENV__.module,
file: __ENV__.file,
line: __ENV__.line,
function: __ENV__.function
)
)
end
end
defmacro event(subsystem, measurement, value, meta) do
Mix.raise("""
Unknown args for telemetry event:
#{inspect(subsystem)}, #{inspect(measurement)}, #{inspect(value)}, #{inspect(meta)}
""")
end
@doc "Execute a telemetry metric"
defmacro metric(subsystem, measurement, value, meta \\ %{}) do
defmacro metric(subsystem, measurement, value, meta \\ [])
when is_atom(subsystem) and is_atom(measurement) and is_list(meta) do
quote location: :keep do
FarmbotTelemetry.bare_telemetry(
UUID.uuid4(),
@ -59,7 +73,12 @@ defmodule FarmbotTelemetry do
unquote(measurement),
unquote(value),
DateTime.utc_now(),
FarmbotTelemetry.telemetry_meta(__ENV__, unquote(Macro.escape(meta)))
Keyword.merge(unquote(meta),
module: __ENV__.module,
file: __ENV__.file,
line: __ENV__.line,
function: __ENV__.function
)
)
end
end
@ -69,18 +88,18 @@ defmodule FarmbotTelemetry do
:ok
def bare_telemetry(uuid, kind, subsystem, measurement, value, captured_at, meta)
when is_binary(uuid) and is_atom(kind) and is_atom(subsystem) and is_atom(measurement) and
is_map(meta) do
is_list(meta) do
_ =
:telemetry.execute(
[:farmbot_telemetry, kind, subsystem],
%{measurement: measurement, value: value, captured_at: captured_at, uuid: uuid},
meta
Map.new(meta)
)
_ =
:dets.insert(
:farmbot_telemetry,
{uuid, captured_at, kind, subsystem, measurement, value, meta}
{uuid, captured_at, kind, subsystem, measurement, value, Map.new(meta)}
)
end

View File

@ -11,4 +11,14 @@ defmodule FarmbotTelemetryTest do
assert_receive {[:farmbot_telemetry, :event, :test_subsystem],
%{measurement: :measurement, value: 1.0}, _meta, _config}
end
test "assigns meta" do
:ok = FarmbotTelemetry.attach_recv(:event, :test_subsystem, self())
:ok = FarmbotTelemetry.event(:test_subsystem, :measurement, 1.0, hello: "world")
assert_receive {[:farmbot_telemetry, :event, :test_subsystem],
%{measurement: :measurement, value: 1.0}, meta, _config}
assert meta[:hello] == "world"
end
end