Stabilize AMQP connect and disconnects
* AMQP wasn't disconnected properly, which was causing bots to appear to be offline because the connection was broken, but not closed.pull/974/head
parent
b841005038
commit
47d024e561
|
@ -5,6 +5,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
|
||||
use GenServer
|
||||
require Logger
|
||||
require FarmbotCore.Logger
|
||||
alias AMQP.{Basic, Channel, Queue}
|
||||
|
||||
alias FarmbotExt.{JWT, AMQP.ConnectionWorker}
|
||||
|
@ -23,6 +24,10 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
GenServer.call(connection_worker, :connection)
|
||||
end
|
||||
|
||||
def close(connection_worker \\ __MODULE__) do
|
||||
GenServer.call(connection_worker, :close)
|
||||
end
|
||||
|
||||
@doc "Cleanly close an AMQP channel"
|
||||
@callback close_channel(channel) :: nil
|
||||
def close_channel(chan) do
|
||||
|
@ -99,17 +104,8 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
end
|
||||
|
||||
@impl GenServer
|
||||
def terminate(reason, %{conn: conn}) do
|
||||
if Process.alive?(conn.pid) do
|
||||
try do
|
||||
Logger.info("Closing AMQP connection: #{inspect(reason)}")
|
||||
:ok = AMQP.Connection.close(conn)
|
||||
rescue
|
||||
ex ->
|
||||
message = Exception.message(ex)
|
||||
Logger.error("Could not close AMQP connection: #{message}")
|
||||
end
|
||||
end
|
||||
def terminate(_reason, %{conn: conn}) do
|
||||
close_connection(conn)
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
|
@ -120,19 +116,20 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
|
||||
case open_connection(token, email, jwt.bot, jwt.mqtt, jwt.vhost) do
|
||||
{:ok, conn} ->
|
||||
Process.link(conn.pid)
|
||||
Process.monitor(conn.pid)
|
||||
{:noreply, %{state | conn: conn}}
|
||||
|
||||
err ->
|
||||
Logger.error("Error connecting to AMPQ: #{inspect(err)}")
|
||||
{:noreply, %{state | conn: nil}, 5000}
|
||||
Process.send_after(self(), :timeout, 5000)
|
||||
{:noreply, %{state | conn: nil}}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info({:EXIT, _pid, reason}, conn) do
|
||||
Logger.error("Connection crash: #{inspect(reason)}")
|
||||
{:stop, reason, conn}
|
||||
def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
|
||||
FarmbotCore.Logger.error(2, "AMQP Connection exit")
|
||||
_ = close_connection(state.conn)
|
||||
{:stop, reason, state}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
|
@ -140,6 +137,12 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
{:reply, conn, state}
|
||||
end
|
||||
|
||||
def handle_call(:close, _from, %{conn: _conn} = state) do
|
||||
FarmbotCore.Logger.error(2, "AMQP Connection closing")
|
||||
reply = close_connection(state.conn)
|
||||
{:stop, :close, reply, %{state | conn: nil}}
|
||||
end
|
||||
|
||||
# Public function because the NervesHub channel requires it.
|
||||
# TODO(Connor) - Fix that
|
||||
@doc false
|
||||
|
@ -164,9 +167,25 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
host: mqtt_server,
|
||||
username: bot,
|
||||
password: token,
|
||||
virtual_host: vhost
|
||||
virtual_host: vhost,
|
||||
connection_timeout: 10_000
|
||||
]
|
||||
|
||||
AMQP.Connection.open(opts)
|
||||
end
|
||||
|
||||
defp close_connection(nil), do: :ok
|
||||
|
||||
defp close_connection(%{pid: _} = conn) do
|
||||
if Process.alive?(conn.pid) do
|
||||
try do
|
||||
:ok = AMQP.Connection.close(conn)
|
||||
rescue
|
||||
ex ->
|
||||
message = Exception.message(ex)
|
||||
Logger.error("Could not close AMQP connection: #{message}")
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue