From a3e8b094ca90803eff543c968c0d90cb1afa758d Mon Sep 17 00:00:00 2001 From: Connor Rigby Date: Tue, 10 Sep 2019 11:09:29 -0700 Subject: [PATCH] Fix CeleryScript channel reconnects **even more** --- .../farmbot_ext/amqp/celery_script_channel.ex | 49 +++++++------- .../lib/farmbot_ext/amqp/connection_worker.ex | 11 ---- .../amqp/celery_script_channel_test.exs | 64 ------------------- 3 files changed, 26 insertions(+), 98 deletions(-) delete mode 100644 farmbot_ext/test/farmbot_ext/amqp/celery_script_channel_test.exs diff --git a/farmbot_ext/lib/farmbot_ext/amqp/celery_script_channel.ex b/farmbot_ext/lib/farmbot_ext/amqp/celery_script_channel.ex index 9401df4a..527581ff 100644 --- a/farmbot_ext/lib/farmbot_ext/amqp/celery_script_channel.ex +++ b/farmbot_ext/lib/farmbot_ext/amqp/celery_script_channel.ex @@ -26,7 +26,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do def init(args) do jwt = Keyword.fetch!(args, :jwt) Process.flag(:sensitive, true) - send(self(), :timeout) + send(self(), :connect_amqp) {:ok, %State{conn: nil, chan: nil, jwt: jwt, rpc_requests: %{}}} end @@ -36,9 +36,31 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do if state.chan, do: AMQP.Channel.close(state.chan) end - def handle_info(:timeout, state) do - status = ConnectionWorker.maybe_connect_celeryscript(state.jwt.bot) - compute_reply_from_amqp_state(state, status) + def handle_info(:connect_amqp, state) do + bot = state.jwt.bot + queue_name = "#{bot}_from_clients" + route = "bot.#{bot}.from_clients" + + with %{} = conn <- ConnectionWorker.connection(), + {:ok, %{pid: channel_pid} = chan} <- Channel.open(conn), + Process.link(channel_pid), + :ok <- Basic.qos(chan, global: true), + {:ok, _} <- Queue.declare(chan, queue_name, auto_delete: true), + {:ok, _} <- Queue.purge(chan, queue_name), + :ok <- Queue.bind(chan, queue_name, @exchange, routing_key: route), + {:ok, _tag} <- Basic.consume(chan, queue_name, self(), no_ack: true) do + FarmbotCore.Logger.debug(3, "connected to CeleryScript channel") + {:noreply, %{state | conn: conn, chan: chan}} + else + nil -> + Process.send_after(self(), :connect_amqp, 5000) + {:noreply, %{state | conn: nil, chan: nil}} + + err -> + FarmbotCore.Logger.error(1, "Failed to connect to CeleryScript channel: #{inspect(err)}") + Process.send_after(self(), :connect_amqp, 2000) + {:noreply, %{state | conn: nil, chan: nil}} + end end # Confirmation sent by the broker after registering this process as a consumer @@ -133,23 +155,4 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do {:noreply, state} end end - - defp compute_reply_from_amqp_state(state, %{conn: conn, chan: chan}) do - {:noreply, %{state | conn: conn, chan: chan}} - end - - defp compute_reply_from_amqp_state(state, error) do - # Run error warning if error not nil - if error, - do: - FarmbotCore.Logger.error( - 1, - "Failed to connect to CeleryScript channel: #{inspect(error)}" - ) - - # Try to reconnect every 5 seconds. This should have some randomness - # sprinkled onto it in the case of mass disconnects etc. - Process.send_after(self(), :timeout, 5000) - {:noreply, %{state | conn: nil, chan: nil}} - end end diff --git a/farmbot_ext/lib/farmbot_ext/amqp/connection_worker.ex b/farmbot_ext/lib/farmbot_ext/amqp/connection_worker.ex index 509bf948..3325e675 100644 --- a/farmbot_ext/lib/farmbot_ext/amqp/connection_worker.ex +++ b/farmbot_ext/lib/farmbot_ext/amqp/connection_worker.ex @@ -45,17 +45,6 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do maybe_connect(chan_name, route, auto_delete, purge?) end - @doc "Takes the 'bot' claim seen in the JWT and connects to the RPC server." - @callback maybe_connect_celeryscript(String.t()) :: map() - def maybe_connect_celeryscript(jwt_dot_bot) do - auto_delete = true - chan_name = jwt_dot_bot <> "_from_clients" - purge? = true - route = "bot.#{jwt_dot_bot}.from_clients" - - maybe_connect(chan_name, route, auto_delete, purge?) - end - defp maybe_connect(chan_name, route, auto_delete, purge?) do with %{} = conn <- FarmbotExt.AMQP.ConnectionWorker.connection(), {:ok, chan} <- Channel.open(conn), diff --git a/farmbot_ext/test/farmbot_ext/amqp/celery_script_channel_test.exs b/farmbot_ext/test/farmbot_ext/amqp/celery_script_channel_test.exs deleted file mode 100644 index de50b60a..00000000 --- a/farmbot_ext/test/farmbot_ext/amqp/celery_script_channel_test.exs +++ /dev/null @@ -1,64 +0,0 @@ -defmodule FarmbotExt.AMQP.CeleryScriptChannelTest do - use ExUnit.Case - import Mox - alias FarmbotExt.JWT - alias FarmbotExt.AMQP.{ConnectionWorker, CeleryScriptChannel} - - @jwt JWT.decode!( - "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZ" <> - "G1pbkBhZG1pbi5jb20iLCJpYXQiOjE1MDIxMjcxMTcsImp0a" <> - "SI6IjlhZjY2NzJmLTY5NmEtNDhlMy04ODVkLWJiZjEyZDlhY" <> - "ThjMiIsImlzcyI6Ii8vbG9jYWxob3N0OjMwMDAiLCJleHAiO" <> - "jE1MDU1ODMxMTcsIm1xdHQiOiJsb2NhbGhvc3QiLCJvc191c" <> - "GRhdGVfc2VydmVyIjoiaHR0cHM6Ly9hcGkuZ2l0aHViLmNvb" <> - "S9yZXBvcy9mYXJtYm90L2Zhcm1ib3Rfb3MvcmVsZWFzZXMvb" <> - "GF0ZXN0IiwiZndfdXBkYXRlX3NlcnZlciI6Imh0dHBzOi8vY" <> - "XBpLmdpdGh1Yi5jb20vcmVwb3MvRmFybUJvdC9mYXJtYm90L" <> - "WFyZHVpbm8tZmlybXdhcmUvcmVsZWFzZXMvbGF0ZXN0IiwiY" <> - "m90IjoiZGV2aWNlXzE1In0.XidSeTKp01ngtkHzKD_zklMVr" <> - "9ZUHX-U_VDlwCSmNA8ahOHxkwCtx8a3o_McBWvOYZN8RRzQV" <> - "LlHJugHq1Vvw2KiUktK_1ABQ4-RuwxOyOBqqc11-6H_GbkM8" <> - "dyzqRaWDnpTqHzkHGxanoWVTTgGx2i_MZLr8FPZ8prnRdwC1" <> - "x9zZ6xY7BtMPtHW0ddvMtXU8ZVF4CWJwKSaM0Q2pTxI9GRqr" <> - "p5Y8UjaKufif7bBPOUbkEHLNOiaux4MQr-OWAC8TrYMyFHzt" <> - "eXTEVkqw7rved84ogw6EKBSFCVqwRA-NKWLpPMV_q7fRwiEG" <> - "Wj7R-KZqRweALXuvCLF765E6-ENxA" - ) - - setup :verify_on_exit! - setup :set_mox_global - - def pretend_network_returned(fake_value) do - test_pid = self() - - expect(ConnectionWorker, :maybe_connect_celeryscript, fn jwt_dot_bot -> - send(test_pid, {:maybe_connect_celeryscript_called, jwt_dot_bot}) - fake_value - end) - - stub(ConnectionWorker, :close_channel, fn _ -> - send(test_pid, :close_channel_called) - :ok - end) - - {:ok, pid} = CeleryScriptChannel.start_link([jwt: @jwt], []) - assert_receive {:maybe_connect_celeryscript_called, "device_15"} - - {:ok, pid} - end - - test "Network connection returning `nil`" do - {:ok, pid} = pretend_network_returned(%{conn: nil, chan: nil}) - state = :sys.get_state(pid) - refute state.chan - refute state.conn - end - - test "Network connection returning non-`nil`" do - {:ok, pid} = pretend_network_returned(%{conn: %{a: :b}, chan: %{c: :d}}) - - state = :sys.get_state(pid) - assert state.chan == %{c: :d} - assert state.conn == %{a: :b} - end -end