Implement process separation for celery scheduler

This should hopefuly prevent hanging forever and other weird
issues relating to executing celeryscript
pull/974/head
Connor Rigby 2019-06-06 15:49:05 -07:00
parent 5496efd390
commit 697850b4b3
No known key found for this signature in database
GPG Key ID: 29A88B24B70456E0
2 changed files with 40 additions and 30 deletions

View File

@ -22,7 +22,8 @@ defmodule FarmbotCeleryScript.Scheduler do
alias FarmbotCeleryScript.{AST, RuntimeError, Compiler}
defstruct steps: [],
execute: false
execute: false,
waiting: %{}
@doc "Start an instance of a CeleryScript Scheduler"
def start_link(args, opts \\ [name: __MODULE__]) do
@ -77,17 +78,18 @@ defmodule FarmbotCeleryScript.Scheduler do
@impl true
def handle_call({:execute, compiled}, {_pid, ref} = from, state) do
# Warning, timestamps may be unstable in offline situations.
send(self(), :timeout)
send(self(), :begin_execution)
{:reply, {:ok, ref}, %{state | steps: [{from, :os.system_time(), compiled} | state.steps]}}
end
def handle_call({:schedule, compiled}, {_pid, ref} = from, state) do
send(self(), :timeout)
send(self(), :begin_execution)
{:reply, {:ok, ref}, %{state | steps: state.steps ++ [{from, nil, compiled}]}}
end
@impl true
def handle_info(:timeout, %{steps: steps} = state) when length(steps) >= 1 do
def handle_info(:begin_execution, %{steps: steps} = state) when length(steps) >= 1 do
[{{_pid, _ref} = from, timestamp, compiled} | rest] =
Enum.sort(steps, fn
{_, first_ts, _}, {_, second_ts, _} when first_ts <= second_ts -> true
@ -104,38 +106,44 @@ defmodule FarmbotCeleryScript.Scheduler do
end
end
def handle_info(:timeout, %{steps: []} = state) do
def handle_info(:begin_execution, %{steps: []} = state) do
{:noreply, state}
end
def handle_info({:step_complete, {pid, ref} = from, result}, state) do
send(pid, {__MODULE__, ref, result})
send(self(), :begin_execution)
{:noreply, %{state | waiting: Map.delete(state.waiting, from), execute: false}}
end
@impl true
def handle_continue({{pid, ref} = from, [step | rest]}, state) do
case step(state, step) do
[fun | _] = more when is_function(fun, 0) ->
{:noreply, state, {:continue, {from, more ++ rest}}}
{:error, reason} ->
send(pid, {__MODULE__, ref, {:error, reason}})
send(self(), :timeout)
{:noreply, state}
_ ->
{:noreply, state, {:continue, {from, rest}}}
end
def handle_continue({from, steps}, state) do
mon = spawn_link(__MODULE__, :step, [self(), from, steps])
{:noreply, %{state | waiting: Map.put(state.waiting, from, %{monitor: mon, from: from})}}
end
def handle_continue({{pid, ref}, []}, state) do
send(pid, {__MODULE__, ref, :ok})
send(self(), :timeout)
{:noreply, %{state | execute: false}}
end
def step(_state, fun) when is_function(fun, 0) do
def step(scheduler_pid, from, [fun | rest]) when is_function(fun, 0) do
try do
fun.()
case fun.() do
[fun | _] = more when is_function(fun, 0) ->
step(scheduler_pid, from, more ++ rest)
{:error, reason} ->
send(scheduler_pid, {:step_complete, from, {:error, reason}})
_ ->
step(scheduler_pid, from, rest)
end
rescue
e in RuntimeError -> {:error, Exception.message(e)}
exception -> reraise(exception, __STACKTRACE__)
e in RuntimeError ->
send(scheduler_pid, {:step_complete, from, {:error, Exception.message(e)}})
catch
error ->
send(scheduler_pid, {:step_complete, from, {:error, error}})
end
end
def step(scheduler_pid, from, []) do
send(scheduler_pid, {:step_complete, from, :ok})
end
end

View File

@ -173,8 +173,10 @@ defmodule FarmbotCeleryScript.SchedulerTest do
assert_receive {:wait, time_1}
assert_receive {:read_pin, time_2}
assert_receive {:write_pin, time_3}
assert [^time_1, ^time_3, ^time_2] = Enum.sort([time_1, time_2, time_3], &(&1 <= &2))
sorted = Enum.sort([time_1, time_2, time_3], &(&1 <= &2))
assert time_1 in sorted
assert time_2 in sorted
assert time_3 in sorted
end
test "execute twice", %{sch: sch} do