farmbot_os/farmbot_os/platform/target/nerves_hub_client.ex

493 lines
14 KiB
Elixir
Raw Normal View History

2019-03-05 12:35:09 -07:00
defmodule FarmbotOS.Platform.Target.NervesHubClient do
2018-11-21 17:20:11 -07:00
@moduledoc """
NervesHub.Client implementation.
This should be one of the very first processes to be started.
Because it is started so early, it has to check for things that
might not be available in the environment. Environment is checked
in this order:
* token
* email
* amqp access (relies on network + NTP)
* `DeviceCert`
* nerves_hub `cert` + `key`
## State flow
The basic flow of state changes follows that path as well.
1) check for nerves_hub `cert` and `key`
1a) if `cert` and `key` are available goto 4.
1b) if not, goto 2.
2) check for Farmbot `DeviceCert`.
2a) if available update tags. goto 3.
2b) if not avialable, create. goto 3.
3) Wait for Farmbot API to dispatch nerves_hub `cert` and `key`.
4) When `cert` and `key` is available, try to connect to `nerves_hub`.
## After connection
While connected to NervesHub this process implements the
`NervesHub.Client` behaviour. When an update becomes available from a
NervesHub deployment, `update_available` will be called. This should
check if the Farmbot settings allow for auto updating. If so, apply the
update, if not wait for a CeleryScript request to update via `check_update`
2018-11-21 17:20:11 -07:00
"""
use GenServer
use AMQP
alias AMQP.{
Channel,
Queue
}
alias FarmbotCore.Project
alias FarmbotCore.{BotState, BotState.JobProgress.Percent, Config}
alias FarmbotCore.{Asset, Config, JSON}
alias FarmbotExt.JWT
2019-03-05 12:35:09 -07:00
require FarmbotCore.Logger
require Logger
2018-12-05 11:30:36 -07:00
alias FarmbotExt.AMQP.ConnectionWorker
2018-11-21 17:20:11 -07:00
@behaviour NervesHub.Client
@exchange "amq.topic"
2019-07-18 09:34:59 -06:00
defstruct [
:conn,
:chan,
:jwt,
:key,
:cert,
:is_applying_update,
:firmware_url,
:probably_connected
]
alias __MODULE__, as: State
@doc false
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@doc "Returns the serial number of this device"
def serial_number(:rpi0), do: serial_number("rpi")
def serial_number(:rpi3), do: serial_number("rpi")
2018-11-21 17:20:11 -07:00
def serial_number(plat) do
2018-11-23 11:25:14 -07:00
:os.cmd(
'/usr/bin/boardid -b uboot_env -u nerves_serial_number -b uboot_env -u serial_number -b #{
plat
}'
)
2018-11-21 17:20:11 -07:00
|> to_string()
|> String.trim()
end
@doc "Returns the serial number of this device"
2019-03-05 12:35:09 -07:00
def serial_number, do: serial_number(Project.target())
2018-11-21 17:20:11 -07:00
@doc "Returns the uuid of the running firmware"
def uuid, do: Nerves.Runtime.KV.get_active("nerves_fw_uuid")
2018-12-17 10:53:39 -07:00
@doc "Loads the cert from storage"
def load_cert, do: Nerves.Runtime.KV.get_active("nerves_hub_cert") |> filter_parens()
2018-12-17 10:53:39 -07:00
@doc "Loads the key from storage"
def load_key, do: Nerves.Runtime.KV.get_active("nerves_hub_key") |> filter_parens()
@doc false
def write_serial(serial_number) do
Nerves.Runtime.KV.UBootEnv.put("nerves_serial_number", serial_number)
Nerves.Runtime.KV.UBootEnv.put("nerves_fw_serial_number", serial_number)
2018-11-21 17:20:11 -07:00
end
@doc false
def write_cert(cert) do
Nerves.Runtime.KV.UBootEnv.put("nerves_hub_cert", cert)
2018-11-21 17:20:11 -07:00
end
@doc false
def write_key(key) do
2019-04-18 14:52:13 -06:00
Nerves.Runtime.KV.UBootEnv.put("nerves_hub_key", key)
end
@impl NervesHub.Client
def handle_error(error) do
GenServer.cast(__MODULE__, {:handle_nerves_hub_error, error})
2018-11-21 17:20:11 -07:00
:ok
end
@impl NervesHub.Client
def handle_fwup_message(msg) do
GenServer.cast(__MODULE__, {:handle_nerves_hub_fwup_message, msg})
2018-11-21 17:20:11 -07:00
:ok
end
@impl NervesHub.Client
def update_available(data) do
GenServer.call(__MODULE__, {:handle_nerves_hub_update_available, data})
2018-11-21 17:20:11 -07:00
end
@doc """
Checks if an update is available, and applies it.
"""
2018-11-21 17:20:11 -07:00
def check_update do
GenServer.call(__MODULE__, :check_update)
end
2018-11-23 11:25:14 -07:00
2019-06-25 16:28:08 -06:00
def do_restart_nerves_hub do
try do
# NervesHub replaces it's own env on startup. Reset it.
# Stop Nerves Hub if it is running.
2019-07-01 10:15:03 -06:00
_ = Supervisor.terminate_child(FarmbotOS.Init.Supervisor, NervesHub.Supervisor)
_ = Supervisor.delete_child(FarmbotOS.Init.Supervisor, NervesHub.Supervisor)
2019-06-25 16:28:08 -06:00
cert = load_cert()
key = load_key()
if cert && key do
end
2019-06-25 16:28:08 -06:00
# Cause NervesRuntime.KV to restart.
# _ = GenServer.stop(Nerves.Runtime.KV, :restart)
_ = Application.stop(:nerves_runtime)
Process.sleep(1000)
2019-06-25 16:28:08 -06:00
_ = Application.ensure_all_started(:nerves_runtime)
_ = Application.ensure_all_started(:nerves_hub)
# Wait for a few seconds for good luck.
Process.sleep(1000)
catch
kind, err ->
IO.warn("NervesHub error: #{inspect(kind)} #{inspect(err)}", __STACKTRACE__)
FarmbotCore.Logger.error(1, "OTA service error: #{kind} #{inspect(err)}")
end
# Start the connection again.
Supervisor.start_child(FarmbotOS, NervesHub.Supervisor)
end
@impl GenServer
def init(_args) do
2019-06-25 16:28:08 -06:00
Application.ensure_all_started(:nerves_runtime)
Application.ensure_all_started(:nerves_hub)
write_serial(serial_number())
Process.flag(:sensitive, true)
cert = load_cert()
key = load_key()
2018-12-10 10:35:47 -07:00
if cert && key do
send(self(), :connect_nerves_hub)
else
send(self(), :connect_amqp)
2018-11-21 17:20:11 -07:00
end
2019-07-18 09:34:59 -06:00
{:ok,
%State{
conn: nil,
chan: nil,
jwt: nil,
cert: cert,
key: key,
is_applying_update: false,
probably_connected: false
}}
2018-11-21 17:20:11 -07:00
end
@impl GenServer
def terminate(reason, state) do
FarmbotCore.Logger.error(1, "Disconnected from NervesHub AMQP channel: #{inspect(reason)}")
# If a channel was still open, close it.
if state.chan, do: AMQP.Channel.close(state.chan)
2018-11-21 17:20:11 -07:00
end
@impl GenServer
def handle_info(:connect_amqp, %{conn: nil, chan: nil} = state) do
FarmbotCore.Logger.debug(1, "Attempting to get OTA certs from AMQP.")
with {token, %{} = jwt} when is_binary(token) <- get_jwt(),
email when is_binary(email) <- get_email(),
{:ok, %{} = conn} <- open_connection(email, token, jwt),
{:ok, chan} <- Channel.open(conn),
:ok <- Basic.qos(chan, global: true),
{:ok, _} <-
Queue.declare(chan, "#{jwt.bot}_nerves_hub", auto_delete: false, durable: true),
:ok <-
Queue.bind(chan, "#{jwt.bot}_nerves_hub", @exchange,
routing_key: "bot.#{jwt.bot}.nerves_hub"
),
{:ok, _tag} <- Basic.consume(chan, "#{jwt.bot}_nerves_hub", self(), []) do
send(self(), :after_connect_amqp)
{:noreply, %{state | conn: conn, chan: chan, jwt: jwt}}
else
# happens when no token is configured.
{nil, nil} ->
FarmbotCore.Logger.debug(3, "No credentials yet. Can't connect to OTA Server.")
Process.send_after(self(), :connect_amqp, 15_000)
{:noreply, %{state | conn: nil, chan: nil, jwt: nil}}
err ->
FarmbotCore.Logger.error(
3,
"Failed to connect to NervesHub AMQP channel: #{inspect(err)}"
)
2018-11-23 11:25:14 -07:00
Process.send_after(self(), :connect_amqp, 5000)
{:noreply, %{state | conn: nil, chan: nil, jwt: nil}}
2018-12-05 11:30:36 -07:00
end
end
def handle_info(:after_connect_amqp, %{key: nil, cert: nil} = state) do
FarmbotCore.Logger.debug(3, "Connected to NervesHub AMQP channel. Fetching certs.")
old_device_cert = Asset.get_device_cert(serial_number: serial_number())
tags = detect_deployment_tags()
params = %{
serial_number: serial_number(),
tags: tags
}
new_device_cert =
case old_device_cert do
nil -> Asset.new_device_cert(params)
%{} -> Asset.update_device_cert(old_device_cert, params)
end
case new_device_cert do
{:ok, data} ->
# DO NOT DO THIS. The api will do it behind the scenes
# Asset.update_device!(%{update_channel: detect_update_channel()})
FarmbotCore.Logger.debug(3, "DeviceCert created: #{inspect(data)}")
FarmbotCore.Logger.debug(3, "Waiting for cert and key data from AMQP from farmbot api...")
{:noreply, state}
{:error, reason} ->
FarmbotCore.Logger.error(1, "Failed to create device cert: #{inspect(reason)}")
Process.send_after(self(), :after_connect_amqp, 5000)
{:noreply, state}
2018-12-05 11:30:36 -07:00
end
end
2018-11-23 11:25:14 -07:00
def handle_info(:after_connect_amqp, %{key: _key, cert: _cert} = state) do
FarmbotCore.Logger.debug(3, "Connected to NervesHub AMQP channel. Certs already loaded")
send(self(), :connect_nerves_hub)
{:noreply, state}
2018-12-05 11:30:36 -07:00
end
def handle_info(:connect_nerves_hub, %{key: nil, cert: nil} = state) do
FarmbotCore.Logger.debug(3, "Can't connect to OTA Service. Certs not loaded")
send(self(), :connect_amqp)
{:noreply, state}
end
2018-11-23 11:25:14 -07:00
2019-07-18 09:34:59 -06:00
def handle_info(
:connect_nerves_hub,
%{key: _key, cert: _cert, probably_connected: false} = state
) do
FarmbotCore.Logger.debug(3, "Starting OTA Service")
2019-06-25 16:28:08 -06:00
do_restart_nerves_hub()
FarmbotCore.Logger.debug(3, "OTA Service started")
2019-07-18 09:34:59 -06:00
{:noreply, %{state | probably_connected: true}}
end
def handle_info(
:connect_nerves_hub,
%{key: _key, cert: _cert, probably_connected: true} = state
) do
{:noreply, state}
2018-12-05 11:30:36 -07:00
end
# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, _}, state) do
{:noreply, state}
end
2018-11-23 11:25:14 -07:00
# Sent by the broker when the consumer is
# unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, _}, state) do
{:stop, :normal, state}
end
2018-11-23 11:25:14 -07:00
# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, _}, state) do
{:noreply, state}
2018-11-21 17:20:11 -07:00
end
def handle_info({:basic_deliver, payload, %{routing_key: key}}, state) do
device = state.jwt.bot
["bot", ^device, "nerves_hub"] = String.split(key, ".")
with {:ok, %{"cert" => base64_cert, "key" => base64_key}} <- JSON.decode(payload),
{:ok, cert} <- Base.decode64(base64_cert),
{:ok, key} <- Base.decode64(base64_key),
:ok <- write_cert(cert),
:ok <- write_key(key) do
send(self(), :connect_nerves_hub)
{:noreply, %{state | cert: cert, key: key}}
else
{:error, reason} ->
FarmbotCore.Logger.error(1, "OTA Service failed to configure. #{inspect(reason)}")
{:stop, reason, state}
2018-11-23 11:25:14 -07:00
:error ->
FarmbotCore.Logger.error(1, "OTA Service payload invalid. (base64)")
{:stop, :invalid_payload, state}
2018-12-05 11:30:36 -07:00
end
end
2018-11-23 11:25:14 -07:00
@impl GenServer
def handle_cast({:handle_nerves_hub_error, error}, %{is_applying_update: true} = state) do
FarmbotCore.Logger.error(1, "Error applying OTA: #{inspect(error)}")
{:noreply, state}
2018-11-21 17:20:11 -07:00
end
def handle_cast({:handle_nerves_hub_error, error}, state) do
FarmbotCore.Logger.debug(3, "Unexpected NervesHub error: #{inspect(error)}")
{:noreply, state}
2018-11-21 17:20:11 -07:00
end
def handle_cast({:handle_nerves_hub_fwup_message, {:progress, percent}}, state) do
_ = set_ota_progress(percent)
{:noreply, state}
2018-11-21 17:20:11 -07:00
end
def handle_cast({:handle_nerves_hub_fwup_message, {:ok, _, _info}}, state) do
_ = set_firmware_needs_flash()
_ = set_ota_progress(100)
{:noreply, state}
2018-11-21 17:20:11 -07:00
end
def handle_cast({:handle_nerves_hub_fwup_message, {:error, _, reason}}, state) do
_ = set_ota_progress(100)
FarmbotCore.Logger.error(1, "Error applying OTA: #{inspect(reason)}")
{:noreply, state}
end
2018-11-23 11:25:14 -07:00
def handle_cast({:handle_nerves_hub_fwup_message, _}, state) do
{:noreply, state}
end
@impl GenServer
def handle_call({:handle_nerves_hub_update_available, %{"firmware_url" => url}}, _from, state) do
2019-03-05 12:35:09 -07:00
case Asset.fbos_config(:os_auto_update) do
2018-12-05 11:30:36 -07:00
true ->
set_update_available_in_bot_state()
FarmbotCore.Logger.busy(1, "Applying OTA update")
_ = set_firmware_needs_flash()
{:reply, :apply, %{state | is_applying_update: true, firmware_url: url}}
2018-11-23 11:25:14 -07:00
_ ->
set_update_available_in_bot_state()
2019-03-05 12:35:09 -07:00
FarmbotCore.Logger.info(1, "New Farmbot OS is available!")
{:reply, :ignore, %{state | firmware_url: url}}
2018-11-21 17:20:11 -07:00
end
end
def handle_call({:handle_nerves_hub_update_available, _data}, _from, state) do
set_update_available_in_bot_state()
FarmbotCore.Logger.busy(1, "Applying OTA update")
{:reply, :apply, %{state | is_applying_update: true}}
end
2018-11-21 17:20:11 -07:00
def handle_call(:check_update, _from, state) do
case NervesHub.HTTPClient.update() do
{:ok, %{"data" => %{"update_available" => false}}} ->
{:reply, nil, state}
data ->
set_update_available_in_bot_state()
FarmbotCore.Logger.busy(1, "Applying OTA update")
_ = set_firmware_needs_flash()
spawn_link(fn -> NervesHub.update() end)
{:reply, data, %{state | is_applying_update: true}}
end
2018-11-21 17:20:11 -07:00
end
defp set_update_available_in_bot_state() do
if Process.whereis(BotState) do
BotState.set_update_available(true)
end
end
defp get_jwt do
token = Config.get_config_value(:string, "authorization", "token")
2018-11-23 11:25:14 -07:00
if token do
{:ok, jwt} = JWT.decode(token)
{token, jwt}
else
{nil, nil}
end
end
defp get_email do
Config.get_config_value(:string, "authorization", "email")
end
defp open_connection(email, token, jwt) do
case ConnectionWorker.open_connection(token, email, jwt.bot, jwt.mqtt, jwt.vhost) do
{:ok, conn} ->
Process.link(conn.pid)
Process.monitor(conn.pid)
{:ok, conn}
2019-03-05 12:35:09 -07:00
err ->
2019-08-22 08:14:11 -06:00
FarmbotCore.Logger.error(1, "Error connecting to AMQP: #{inspect(err)}")
err
end
end
defp filter_parens(""), do: nil
defp filter_parens(data), do: data
defp set_ota_progress(100) do
FarmbotCore.Logger.success(1, "OTA Complete Going down for reboot")
prog = %Percent{percent: 100, status: "complete"}
2018-11-23 11:25:14 -07:00
if Process.whereis(BotState) do
BotState.set_job_progress("FBOS_OTA", prog)
end
2018-11-23 11:25:14 -07:00
:ok
end
defp set_ota_progress(percent) do
prog = %Percent{percent: percent}
if Process.whereis(BotState) do
BotState.set_job_progress("FBOS_OTA", prog)
end
:ok
end
def set_firmware_needs_flash() do
Config.update_config_value(:bool, "settings", "firmware_needs_flash", true)
Config.update_config_value(:bool, "settings", "firmware_needs_open", false)
:ok
end
def detect_deployment_tags() do
update_channel = detect_update_channel()
["application:#{Project.env()}", "channel:#{update_channel}"]
end
def detect_update_channel() do
if Regex.match?(
~r/(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)-rc(0|[1-9]\d*)+?/,
Project.version()
) do
"beta"
else
case Project.branch() do
"master" -> "stable"
branch -> branch
end
end
end
2018-11-21 17:20:11 -07:00
end