Fix AMQP opening a new connection without closing the older ones
parent
10a4379568
commit
512f9d8910
|
@ -9,8 +9,9 @@ defmodule Farmbot.AMQP.Supervisor do
|
|||
|
||||
def init([]) do
|
||||
token = get_config_value(:string, "authorization", "token")
|
||||
email = get_config_value(:string, "authorization", "email")
|
||||
children = [
|
||||
{Farmbot.AMQP.ConnectionWorker, [token]},
|
||||
{Farmbot.AMQP.ConnectionWorker, [token: token, email: email]},
|
||||
{Farmbot.AMQP.ChannelSupervisor, [token]}
|
||||
]
|
||||
Supervisor.init(children, [strategy: :one_for_all])
|
||||
|
|
|
@ -12,14 +12,31 @@ defmodule Farmbot.AMQP.ConnectionWorker do
|
|||
GenServer.call(__MODULE__, :connection)
|
||||
end
|
||||
|
||||
def init([token]) do
|
||||
def init(opts) do
|
||||
token = Keyword.fetch!(opts, :token)
|
||||
email = Keyword.fetch!(opts, :email)
|
||||
Process.flag(:sensitive, true)
|
||||
Process.flag(:trap_exit, true)
|
||||
jwt = Farmbot.Jwt.decode!(token)
|
||||
{:ok, conn} = open_connection(token, jwt.bot, jwt.mqtt, jwt.vhost)
|
||||
{:ok, conn} = open_connection(token, email, jwt.bot, jwt.mqtt, jwt.vhost)
|
||||
Process.link(conn.pid)
|
||||
Process.monitor(conn.pid)
|
||||
{:ok, conn}
|
||||
end
|
||||
|
||||
def terminate(_, conn) do
|
||||
if Process.alive(conn.pid) do
|
||||
try do
|
||||
Logger.info "Closing AMQP connection."
|
||||
:ok = AMQP.Connection.close(conn)
|
||||
rescue
|
||||
ex ->
|
||||
message = Exception.message(ex)
|
||||
Logger.error "Could not close AMQP connection: #{message}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info({:DOWN, _, :process, _pid, reason}, conn) do
|
||||
ok_reasons = [:normal, :shutdown, :token_refresh]
|
||||
update_config_value(:bool, "settings", "ignore_fbos_config", false)
|
||||
|
@ -33,8 +50,19 @@ defmodule Farmbot.AMQP.ConnectionWorker do
|
|||
|
||||
def handle_call(:connection, _, conn), do: {:reply, conn, conn}
|
||||
|
||||
defp open_connection(token, bot, mqtt_server, vhost) do
|
||||
defp open_connection(token, email, bot, mqtt_server, vhost) do
|
||||
Logger.info "Opening new AMQP connection."
|
||||
opts = [
|
||||
client_properties: [
|
||||
{"version", :longstr, Farmbot.Project.version()},
|
||||
{"commit", :longstr, Farmbot.Project.commit()},
|
||||
{"target", :longstr, Farmbot.Project.target()},
|
||||
{"opened", :longstr, to_string(DateTime.utc_now())},
|
||||
{"product", :longstr, "farmbot_os"},
|
||||
{"bot", :longstr, bot},
|
||||
{"email", :longstr, email},
|
||||
{"node", :longstr, to_string(node())},
|
||||
],
|
||||
host: mqtt_server,
|
||||
username: bot,
|
||||
password: token,
|
||||
|
@ -44,7 +72,7 @@ defmodule Farmbot.AMQP.ConnectionWorker do
|
|||
{:error, reason} ->
|
||||
Logger.error "Error connecting to AMPQ: #{inspect reason}"
|
||||
Process.sleep(5000)
|
||||
open_connection(token, bot, mqtt_server, vhost)
|
||||
open_connection(token, email, bot, mqtt_server, vhost)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue