Add retry mechanism when fetching token on boot.
parent
f3c6f5acf9
commit
01ed6a93f0
|
@ -1,6 +1,8 @@
|
|||
# Changelog
|
||||
# 6.4.12
|
||||
* Fix race condition after getting time which broke self hosting users.
|
||||
* Add retry mechanism for fetching a token.
|
||||
* Farmbot will now try 5 times to fetch a token.
|
||||
|
||||
# 6.4.11
|
||||
* Add SSH back.
|
||||
|
|
|
@ -88,5 +88,4 @@ config :nerves_firmware_ssh, authorized_keys: local_key
|
|||
|
||||
config :shoehorn,
|
||||
init: [:nerves_runtime, :nerves_init_gadget, :nerves_firmware_ssh],
|
||||
handler: Farmbot.ShoehornHandler,
|
||||
app: :farmbot
|
||||
|
|
|
@ -15,17 +15,10 @@ defmodule Farmbot do
|
|||
def start(type, start_opts)
|
||||
|
||||
def start(_, _start_opts) do
|
||||
case Supervisor.start_link(__MODULE__, [], [name: __MODULE__]) do
|
||||
{:ok, pid} -> {:ok, pid, []}
|
||||
error ->
|
||||
IO.puts "Failed to boot Farmbot: #{inspect error}"
|
||||
Farmbot.System.factory_reset(error)
|
||||
exit(error)
|
||||
end
|
||||
Supervisor.start_link(__MODULE__, [], [name: __MODULE__])
|
||||
end
|
||||
|
||||
def init([]) do
|
||||
# RingLogger.attach()
|
||||
children = [
|
||||
{Farmbot.Logger.Supervisor, []},
|
||||
{Farmbot.System.Supervisor, []},
|
||||
|
|
|
@ -30,16 +30,20 @@ defmodule Farmbot.Bootstrap.Authorization do
|
|||
# this is the default authorize implementation.
|
||||
# It gets overwrote in the Test Environment.
|
||||
@doc "Authorizes with the farmbot api."
|
||||
def authorize(email, pw_or_secret, server) do
|
||||
def authorize(email, pw_or_secret, server, tries \\ 5) do
|
||||
case get_config_value(:bool, "settings", "first_boot") do
|
||||
false -> authorize_with_secret(email, pw_or_secret, server)
|
||||
true -> authorize_with_password(email, pw_or_secret, server)
|
||||
end
|
||||
|> case do
|
||||
{:ok, token} -> {:ok, token}
|
||||
err ->
|
||||
err when tries == 0 ->
|
||||
Logger.error 1, "Authorization failed: #{inspect err}"
|
||||
err
|
||||
err ->
|
||||
Logger.error 1, "Authorization failed: #{inspect err}. Trying again #{tries - 1} more times."
|
||||
Process.sleep(2500)
|
||||
authorize(email, pw_or_secret, server, tries - 1)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -36,74 +36,10 @@ defmodule Farmbot.BotState.Transport.AMQP do
|
|||
end
|
||||
|
||||
def init([]) do
|
||||
token = ConfigStorage.get_config_value(:string, "authorization", "token")
|
||||
email = ConfigStorage.get_config_value(:string, "authorization", "email")
|
||||
|
||||
import Farmbot.Jwt, only: [decode: 1]
|
||||
with {:ok, %{bot: device, mqtt: mqtt_host, vhost: vhost}} <- decode(token),
|
||||
{:ok, conn} <- open_connection(token, email, device, mqtt_host, vhost),
|
||||
{:ok, chan} <- AMQP.Channel.open(conn),
|
||||
q_base <- device,
|
||||
|
||||
:ok <- Basic.qos(chan, [global: true]),
|
||||
{:ok, _} <- AMQP.Queue.declare(chan, q_base <> "_from_clients", [auto_delete: true]),
|
||||
from_clients <- [routing_key: "bot.#{device}.from_clients"],
|
||||
{:ok, _} <- AMQP.Queue.purge(chan, q_base <> "_from_clients"),
|
||||
:ok <- AMQP.Queue.bind(chan, q_base <> "_from_clients", @exchange, from_clients),
|
||||
|
||||
{:ok, _} <- AMQP.Queue.declare(chan, q_base <> "_auto_sync", [auto_delete: false]),
|
||||
sync <- [routing_key: "bot.#{device}.sync.#"],
|
||||
:ok <- AMQP.Queue.bind(chan, q_base <> "_auto_sync", @exchange, sync),
|
||||
|
||||
{:ok, _tag} <- Basic.consume(chan, q_base <> "_from_clients", self(), [no_ack: true]),
|
||||
{:ok, _tag} <- Basic.consume(chan, q_base <> "_auto_sync", self(), [no_ack: true]),
|
||||
state <- %State{conn: conn, chan: chan, bot: device}
|
||||
do
|
||||
_ = Process.monitor(conn.pid)
|
||||
_ = Process.monitor(chan.pid)
|
||||
_ = Process.flag(:sensitive, true)
|
||||
_ = Process.flag(:trap_exit, true)
|
||||
{:consumer, state, subscribe_to: [Farmbot.BotState, Farmbot.Logger]}
|
||||
else
|
||||
{:error, {:auth_failure, msg}} = fail ->
|
||||
Farmbot.System.factory_reset(msg)
|
||||
{:stop, fail}
|
||||
{:error, err} ->
|
||||
msg = "Got error authenticating with Real time services: #{inspect err}"
|
||||
Logger.error 1, msg
|
||||
|
||||
# If the auth task is running, force it to reset.
|
||||
if Process.whereis(Farmbot.Bootstrap.AuthTask) do
|
||||
Farmbot.Bootstrap.AuthTask.force_refresh()
|
||||
end
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
defp open_connection(token, email, bot, mqtt_server, vhost) do
|
||||
opts = [
|
||||
client_properties: [
|
||||
{"version", :longstr, Farmbot.Project.version()},
|
||||
{"commit", :longstr, Farmbot.Project.commit()},
|
||||
{"target", :longstr, Farmbot.Project.target()},
|
||||
{"opened", :longstr, to_string(DateTime.utc_now())},
|
||||
{"product", :longstr, "farmbot_os"},
|
||||
{"bot", :longstr, bot},
|
||||
{"email", :longstr, email},
|
||||
{"node", :longstr, to_string(node())},
|
||||
],
|
||||
host: mqtt_server,
|
||||
username: bot,
|
||||
password: token,
|
||||
virtual_host: vhost]
|
||||
|
||||
case AMQP.Connection.open(opts) do
|
||||
{:ok, conn} -> {:ok, conn}
|
||||
{:error, reason} ->
|
||||
Logger.error 1, "Error connecting to AMPQ: #{inspect reason}"
|
||||
Process.sleep(5000)
|
||||
open_connection(token, email, bot, mqtt_server, vhost)
|
||||
end
|
||||
_ = Process.flag(:sensitive, true)
|
||||
_ = Process.flag(:trap_exit, true)
|
||||
send self(), :connect
|
||||
{:consumer, %State{}, subscribe_to: [Farmbot.BotState, Farmbot.Logger]}
|
||||
end
|
||||
|
||||
def terminate(reason, state) do
|
||||
|
@ -130,6 +66,11 @@ defmodule Farmbot.BotState.Transport.AMQP do
|
|||
end
|
||||
end
|
||||
|
||||
# Don't handle data if there is no connection.
|
||||
def handle_events(_, _, %{conn: nil} = state) do
|
||||
{:noreply, [], state}
|
||||
end
|
||||
|
||||
def handle_events(events, {pid, _}, state) do
|
||||
case Process.info(pid)[:registered_name] do
|
||||
Farmbot.Logger -> handle_log_events(events, state)
|
||||
|
@ -178,6 +119,11 @@ defmodule Farmbot.BotState.Transport.AMQP do
|
|||
{:noreply, [], state}
|
||||
end
|
||||
|
||||
def handle_info(:connect, state) do
|
||||
%State{} = state = do_connect(state)
|
||||
{:noreply, [], state}
|
||||
end
|
||||
|
||||
# Confirmation sent by the broker after registering this process as a consumer
|
||||
def handle_info({:basic_consume_ok, _}, state) do
|
||||
if get_config_value(:bool, "settings", "log_amqp_connected") do
|
||||
|
@ -343,4 +289,61 @@ defmodule Farmbot.BotState.Transport.AMQP do
|
|||
defp add_position_to_log(%{} = log, %{position: pos}) do
|
||||
Map.merge(log, pos)
|
||||
end
|
||||
|
||||
defp do_connect(%State{} = state) do
|
||||
# If a channel was still open, close it.
|
||||
if state.chan, do: AMQP.Channel.close(state.chan)
|
||||
|
||||
# If the connection is still open, close it.
|
||||
if state.conn, do: AMQP.Connection.close(state.conn)
|
||||
token = ConfigStorage.get_config_value(:string, "authorization", "token")
|
||||
email = ConfigStorage.get_config_value(:string, "authorization", "email")
|
||||
|
||||
import Farmbot.Jwt, only: [decode: 1]
|
||||
with {:ok, %{bot: device, mqtt: mqtt_host, vhost: vhost}} <- decode(token),
|
||||
{:ok, conn} <- open_connection(token, email, device, mqtt_host, vhost),
|
||||
{:ok, chan} <- AMQP.Channel.open(conn),
|
||||
q_base <- device,
|
||||
|
||||
:ok <- Basic.qos(chan, [global: true]),
|
||||
{:ok, _} <- AMQP.Queue.declare(chan, q_base <> "_from_clients", [auto_delete: true]),
|
||||
from_clients <- [routing_key: "bot.#{device}.from_clients"],
|
||||
{:ok, _} <- AMQP.Queue.purge(chan, q_base <> "_from_clients"),
|
||||
:ok <- AMQP.Queue.bind(chan, q_base <> "_from_clients", @exchange, from_clients),
|
||||
|
||||
{:ok, _} <- AMQP.Queue.declare(chan, q_base <> "_auto_sync", [auto_delete: false]),
|
||||
sync <- [routing_key: "bot.#{device}.sync.#"],
|
||||
:ok <- AMQP.Queue.bind(chan, q_base <> "_auto_sync", @exchange, sync),
|
||||
|
||||
{:ok, _tag} <- Basic.consume(chan, q_base <> "_from_clients", self(), [no_ack: true]),
|
||||
{:ok, _tag} <- Basic.consume(chan, q_base <> "_auto_sync", self(), [no_ack: true]) do
|
||||
%State{conn: conn, chan: chan, bot: device}
|
||||
end
|
||||
end
|
||||
|
||||
defp open_connection(token, email, bot, mqtt_server, vhost) do
|
||||
opts = [
|
||||
client_properties: [
|
||||
{"version", :longstr, Farmbot.Project.version()},
|
||||
{"commit", :longstr, Farmbot.Project.commit()},
|
||||
{"target", :longstr, Farmbot.Project.target()},
|
||||
{"opened", :longstr, to_string(DateTime.utc_now())},
|
||||
{"product", :longstr, "farmbot_os"},
|
||||
{"bot", :longstr, bot},
|
||||
{"email", :longstr, email},
|
||||
{"node", :longstr, to_string(node())},
|
||||
],
|
||||
host: mqtt_server,
|
||||
username: bot,
|
||||
password: token,
|
||||
virtual_host: vhost]
|
||||
|
||||
case AMQP.Connection.open(opts) do
|
||||
{:ok, conn} -> {:ok, conn}
|
||||
{:error, reason} ->
|
||||
Logger.error 1, "Error connecting to AMPQ: #{inspect reason}"
|
||||
Process.sleep(5000)
|
||||
open_connection(token, email, bot, mqtt_server, vhost)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -43,6 +43,9 @@ defmodule Farmbot.System do
|
|||
@doc "Remove all configuration data, and reboot."
|
||||
@spec factory_reset(unparsed_reason) :: no_return
|
||||
def factory_reset(reason) do
|
||||
if Farmbot.Project.env == :dev do
|
||||
require IEx; IEx.pry()
|
||||
end
|
||||
alias Farmbot.System.ConfigStorage
|
||||
import ConfigStorage, only: [get_config_value: 3]
|
||||
if Process.whereis ConfigStorage do
|
||||
|
|
|
@ -14,12 +14,12 @@ use Mix.Releases.Config,
|
|||
|
||||
environment :dev do
|
||||
set(cookie: :"gz`tgx[zM,ueL[g{Ji62{jiawNDZHH~PGkNQLa&R>R7c0SKziff4L,*&ZNG)(qu0")
|
||||
set(vm_args: "rel/vm.args.dev")
|
||||
set(vm_args: "rel/vm.args")
|
||||
end
|
||||
|
||||
environment :prod do
|
||||
set(cookie: :"gz`tgx[zM,ueL[g{Ji62{jiawNDZHH~PGkNQLa&R>R7c0SKziff4L,*&ZNG)(qu0")
|
||||
set(vm_args: "rel/vm.args.prod")
|
||||
set(vm_args: "rel/vm.args")
|
||||
end
|
||||
|
||||
# You may define one or more releases in this file.
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
-setcookie democookie
|
||||
-sname farmbot
|
||||
-mode embedded
|
||||
-noshell
|
||||
-heart
|
||||
-extra --no-halt
|
Loading…
Reference in New Issue