Fix CeleryScript channel reconnects **even more**
parent
5faf6358fe
commit
a3e8b094ca
|
@ -26,7 +26,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
|
|||
def init(args) do
|
||||
jwt = Keyword.fetch!(args, :jwt)
|
||||
Process.flag(:sensitive, true)
|
||||
send(self(), :timeout)
|
||||
send(self(), :connect_amqp)
|
||||
{:ok, %State{conn: nil, chan: nil, jwt: jwt, rpc_requests: %{}}}
|
||||
end
|
||||
|
||||
|
@ -36,9 +36,31 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
|
|||
if state.chan, do: AMQP.Channel.close(state.chan)
|
||||
end
|
||||
|
||||
def handle_info(:timeout, state) do
|
||||
status = ConnectionWorker.maybe_connect_celeryscript(state.jwt.bot)
|
||||
compute_reply_from_amqp_state(state, status)
|
||||
def handle_info(:connect_amqp, state) do
|
||||
bot = state.jwt.bot
|
||||
queue_name = "#{bot}_from_clients"
|
||||
route = "bot.#{bot}.from_clients"
|
||||
|
||||
with %{} = conn <- ConnectionWorker.connection(),
|
||||
{:ok, %{pid: channel_pid} = chan} <- Channel.open(conn),
|
||||
Process.link(channel_pid),
|
||||
:ok <- Basic.qos(chan, global: true),
|
||||
{:ok, _} <- Queue.declare(chan, queue_name, auto_delete: true),
|
||||
{:ok, _} <- Queue.purge(chan, queue_name),
|
||||
:ok <- Queue.bind(chan, queue_name, @exchange, routing_key: route),
|
||||
{:ok, _tag} <- Basic.consume(chan, queue_name, self(), no_ack: true) do
|
||||
FarmbotCore.Logger.debug(3, "connected to CeleryScript channel")
|
||||
{:noreply, %{state | conn: conn, chan: chan}}
|
||||
else
|
||||
nil ->
|
||||
Process.send_after(self(), :connect_amqp, 5000)
|
||||
{:noreply, %{state | conn: nil, chan: nil}}
|
||||
|
||||
err ->
|
||||
FarmbotCore.Logger.error(1, "Failed to connect to CeleryScript channel: #{inspect(err)}")
|
||||
Process.send_after(self(), :connect_amqp, 2000)
|
||||
{:noreply, %{state | conn: nil, chan: nil}}
|
||||
end
|
||||
end
|
||||
|
||||
# Confirmation sent by the broker after registering this process as a consumer
|
||||
|
@ -133,23 +155,4 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
|
|||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp compute_reply_from_amqp_state(state, %{conn: conn, chan: chan}) do
|
||||
{:noreply, %{state | conn: conn, chan: chan}}
|
||||
end
|
||||
|
||||
defp compute_reply_from_amqp_state(state, error) do
|
||||
# Run error warning if error not nil
|
||||
if error,
|
||||
do:
|
||||
FarmbotCore.Logger.error(
|
||||
1,
|
||||
"Failed to connect to CeleryScript channel: #{inspect(error)}"
|
||||
)
|
||||
|
||||
# Try to reconnect every 5 seconds. This should have some randomness
|
||||
# sprinkled onto it in the case of mass disconnects etc.
|
||||
Process.send_after(self(), :timeout, 5000)
|
||||
{:noreply, %{state | conn: nil, chan: nil}}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -45,17 +45,6 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
|
|||
maybe_connect(chan_name, route, auto_delete, purge?)
|
||||
end
|
||||
|
||||
@doc "Takes the 'bot' claim seen in the JWT and connects to the RPC server."
|
||||
@callback maybe_connect_celeryscript(String.t()) :: map()
|
||||
def maybe_connect_celeryscript(jwt_dot_bot) do
|
||||
auto_delete = true
|
||||
chan_name = jwt_dot_bot <> "_from_clients"
|
||||
purge? = true
|
||||
route = "bot.#{jwt_dot_bot}.from_clients"
|
||||
|
||||
maybe_connect(chan_name, route, auto_delete, purge?)
|
||||
end
|
||||
|
||||
defp maybe_connect(chan_name, route, auto_delete, purge?) do
|
||||
with %{} = conn <- FarmbotExt.AMQP.ConnectionWorker.connection(),
|
||||
{:ok, chan} <- Channel.open(conn),
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
defmodule FarmbotExt.AMQP.CeleryScriptChannelTest do
|
||||
use ExUnit.Case
|
||||
import Mox
|
||||
alias FarmbotExt.JWT
|
||||
alias FarmbotExt.AMQP.{ConnectionWorker, CeleryScriptChannel}
|
||||
|
||||
@jwt JWT.decode!(
|
||||
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZ" <>
|
||||
"G1pbkBhZG1pbi5jb20iLCJpYXQiOjE1MDIxMjcxMTcsImp0a" <>
|
||||
"SI6IjlhZjY2NzJmLTY5NmEtNDhlMy04ODVkLWJiZjEyZDlhY" <>
|
||||
"ThjMiIsImlzcyI6Ii8vbG9jYWxob3N0OjMwMDAiLCJleHAiO" <>
|
||||
"jE1MDU1ODMxMTcsIm1xdHQiOiJsb2NhbGhvc3QiLCJvc191c" <>
|
||||
"GRhdGVfc2VydmVyIjoiaHR0cHM6Ly9hcGkuZ2l0aHViLmNvb" <>
|
||||
"S9yZXBvcy9mYXJtYm90L2Zhcm1ib3Rfb3MvcmVsZWFzZXMvb" <>
|
||||
"GF0ZXN0IiwiZndfdXBkYXRlX3NlcnZlciI6Imh0dHBzOi8vY" <>
|
||||
"XBpLmdpdGh1Yi5jb20vcmVwb3MvRmFybUJvdC9mYXJtYm90L" <>
|
||||
"WFyZHVpbm8tZmlybXdhcmUvcmVsZWFzZXMvbGF0ZXN0IiwiY" <>
|
||||
"m90IjoiZGV2aWNlXzE1In0.XidSeTKp01ngtkHzKD_zklMVr" <>
|
||||
"9ZUHX-U_VDlwCSmNA8ahOHxkwCtx8a3o_McBWvOYZN8RRzQV" <>
|
||||
"LlHJugHq1Vvw2KiUktK_1ABQ4-RuwxOyOBqqc11-6H_GbkM8" <>
|
||||
"dyzqRaWDnpTqHzkHGxanoWVTTgGx2i_MZLr8FPZ8prnRdwC1" <>
|
||||
"x9zZ6xY7BtMPtHW0ddvMtXU8ZVF4CWJwKSaM0Q2pTxI9GRqr" <>
|
||||
"p5Y8UjaKufif7bBPOUbkEHLNOiaux4MQr-OWAC8TrYMyFHzt" <>
|
||||
"eXTEVkqw7rved84ogw6EKBSFCVqwRA-NKWLpPMV_q7fRwiEG" <>
|
||||
"Wj7R-KZqRweALXuvCLF765E6-ENxA"
|
||||
)
|
||||
|
||||
setup :verify_on_exit!
|
||||
setup :set_mox_global
|
||||
|
||||
def pretend_network_returned(fake_value) do
|
||||
test_pid = self()
|
||||
|
||||
expect(ConnectionWorker, :maybe_connect_celeryscript, fn jwt_dot_bot ->
|
||||
send(test_pid, {:maybe_connect_celeryscript_called, jwt_dot_bot})
|
||||
fake_value
|
||||
end)
|
||||
|
||||
stub(ConnectionWorker, :close_channel, fn _ ->
|
||||
send(test_pid, :close_channel_called)
|
||||
:ok
|
||||
end)
|
||||
|
||||
{:ok, pid} = CeleryScriptChannel.start_link([jwt: @jwt], [])
|
||||
assert_receive {:maybe_connect_celeryscript_called, "device_15"}
|
||||
|
||||
{:ok, pid}
|
||||
end
|
||||
|
||||
test "Network connection returning `nil`" do
|
||||
{:ok, pid} = pretend_network_returned(%{conn: nil, chan: nil})
|
||||
state = :sys.get_state(pid)
|
||||
refute state.chan
|
||||
refute state.conn
|
||||
end
|
||||
|
||||
test "Network connection returning non-`nil`" do
|
||||
{:ok, pid} = pretend_network_returned(%{conn: %{a: :b}, chan: %{c: :d}})
|
||||
|
||||
state = :sys.get_state(pid)
|
||||
assert state.chan == %{c: :d}
|
||||
assert state.conn == %{a: :b}
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue