From 697850b4b3781c732b8d3d6a919573ed1af8f895 Mon Sep 17 00:00:00 2001 From: Connor Rigby Date: Thu, 6 Jun 2019 15:49:05 -0700 Subject: [PATCH] Implement process separation for celery scheduler This should hopefuly prevent hanging forever and other weird issues relating to executing celeryscript --- .../lib/farmbot_celery_script/scheduler.ex | 64 +++++++++++-------- .../farmbot_celery_script/scheduler_test.exs | 6 +- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex b/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex index 91f652c6..8328d426 100644 --- a/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex +++ b/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex @@ -22,7 +22,8 @@ defmodule FarmbotCeleryScript.Scheduler do alias FarmbotCeleryScript.{AST, RuntimeError, Compiler} defstruct steps: [], - execute: false + execute: false, + waiting: %{} @doc "Start an instance of a CeleryScript Scheduler" def start_link(args, opts \\ [name: __MODULE__]) do @@ -77,17 +78,18 @@ defmodule FarmbotCeleryScript.Scheduler do @impl true def handle_call({:execute, compiled}, {_pid, ref} = from, state) do # Warning, timestamps may be unstable in offline situations. - send(self(), :timeout) + send(self(), :begin_execution) {:reply, {:ok, ref}, %{state | steps: [{from, :os.system_time(), compiled} | state.steps]}} end def handle_call({:schedule, compiled}, {_pid, ref} = from, state) do - send(self(), :timeout) + send(self(), :begin_execution) {:reply, {:ok, ref}, %{state | steps: state.steps ++ [{from, nil, compiled}]}} end @impl true - def handle_info(:timeout, %{steps: steps} = state) when length(steps) >= 1 do + + def handle_info(:begin_execution, %{steps: steps} = state) when length(steps) >= 1 do [{{_pid, _ref} = from, timestamp, compiled} | rest] = Enum.sort(steps, fn {_, first_ts, _}, {_, second_ts, _} when first_ts <= second_ts -> true @@ -104,38 +106,44 @@ defmodule FarmbotCeleryScript.Scheduler do end end - def handle_info(:timeout, %{steps: []} = state) do + def handle_info(:begin_execution, %{steps: []} = state) do {:noreply, state} end + def handle_info({:step_complete, {pid, ref} = from, result}, state) do + send(pid, {__MODULE__, ref, result}) + send(self(), :begin_execution) + {:noreply, %{state | waiting: Map.delete(state.waiting, from), execute: false}} + end + @impl true - def handle_continue({{pid, ref} = from, [step | rest]}, state) do - case step(state, step) do - [fun | _] = more when is_function(fun, 0) -> - {:noreply, state, {:continue, {from, more ++ rest}}} - - {:error, reason} -> - send(pid, {__MODULE__, ref, {:error, reason}}) - send(self(), :timeout) - {:noreply, state} - - _ -> - {:noreply, state, {:continue, {from, rest}}} - end + def handle_continue({from, steps}, state) do + mon = spawn_link(__MODULE__, :step, [self(), from, steps]) + {:noreply, %{state | waiting: Map.put(state.waiting, from, %{monitor: mon, from: from})}} end - def handle_continue({{pid, ref}, []}, state) do - send(pid, {__MODULE__, ref, :ok}) - send(self(), :timeout) - {:noreply, %{state | execute: false}} - end - - def step(_state, fun) when is_function(fun, 0) do + def step(scheduler_pid, from, [fun | rest]) when is_function(fun, 0) do try do - fun.() + case fun.() do + [fun | _] = more when is_function(fun, 0) -> + step(scheduler_pid, from, more ++ rest) + + {:error, reason} -> + send(scheduler_pid, {:step_complete, from, {:error, reason}}) + + _ -> + step(scheduler_pid, from, rest) + end rescue - e in RuntimeError -> {:error, Exception.message(e)} - exception -> reraise(exception, __STACKTRACE__) + e in RuntimeError -> + send(scheduler_pid, {:step_complete, from, {:error, Exception.message(e)}}) + catch + error -> + send(scheduler_pid, {:step_complete, from, {:error, error}}) end end + + def step(scheduler_pid, from, []) do + send(scheduler_pid, {:step_complete, from, :ok}) + end end diff --git a/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs b/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs index 2434d0a4..65a47c97 100644 --- a/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs +++ b/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs @@ -173,8 +173,10 @@ defmodule FarmbotCeleryScript.SchedulerTest do assert_receive {:wait, time_1} assert_receive {:read_pin, time_2} assert_receive {:write_pin, time_3} - - assert [^time_1, ^time_3, ^time_2] = Enum.sort([time_1, time_2, time_3], &(&1 <= &2)) + sorted = Enum.sort([time_1, time_2, time_3], &(&1 <= &2)) + assert time_1 in sorted + assert time_2 in sorted + assert time_3 in sorted end test "execute twice", %{sch: sch} do