Update scheduler to use ETS instead of storing data on a local state

pull/974/head
Connor Rigby 2019-05-16 10:48:38 -07:00
parent 16499070e9
commit 80b6e53569
No known key found for this signature in database
GPG Key ID: 29A88B24B70456E0
3 changed files with 152 additions and 88 deletions

View File

@ -19,10 +19,17 @@ defmodule FarmbotCeleryScript.Scheduler do
use GenServer use GenServer
alias __MODULE__, as: State alias __MODULE__, as: State
alias FarmbotCeleryScript.{AST, RuntimeError, Compiler} alias FarmbotCeleryScript.Scheduler.CommandRunner
alias FarmbotCeleryScript.{AST, Compiler}
defstruct steps: [], defstruct steps: [],
execute: false table: nil,
execute: nil,
schedule: nil,
execute_spec: nil,
schedule_spec: nil
@table_name :celery_scheduler
@doc "Start an instance of a CeleryScript Scheduler" @doc "Start an instance of a CeleryScript Scheduler"
def start_link(args, opts \\ [name: __MODULE__]) do def start_link(args, opts \\ [name: __MODULE__]) do
@ -42,15 +49,17 @@ defmodule FarmbotCeleryScript.Scheduler do
`move_absolute` at the same time, the `execute`d call will have somewhat `move_absolute` at the same time, the `execute`d call will have somewhat
undefined behaviour depending on the `move_absolute` implementation. undefined behaviour depending on the `move_absolute` implementation.
""" """
@spec execute(GenServer.server(), AST.t() | [Compiler.compiled()]) :: {:ok, reference()} @spec execute(atom, AST.t() | [Compiler.compiled()]) :: {:ok, reference()}
def execute(scheduler_pid \\ __MODULE__, celery_script) def execute(table \\ @table_name, celery_script)
def execute(sch, %AST{} = ast) do def execute(table, %AST{} = ast) do
execute(sch, Compiler.compile(ast)) execute(table, Compiler.compile(ast))
end end
def execute(sch, compiled) when is_list(compiled) do def execute(table, compiled) when is_list(compiled) do
GenServer.call(sch, {:execute, compiled}) ref = make_ref()
:ets.insert(table, {:os.system_time(), {self(), ref}, compiled})
{:ok, ref}
end end
@doc """ @doc """
@ -58,84 +67,71 @@ defmodule FarmbotCeleryScript.Scheduler do
Calls are executed in a first in first out buffer, with things being added Calls are executed in a first in first out buffer, with things being added
by `execute/2` taking priority. by `execute/2` taking priority.
""" """
@spec schedule(GenServer.server(), AST.t() | [Compiler.compiled()]) :: {:ok, reference()} @spec schedule(atom, AST.t() | [Compiler.compiled()]) :: {:ok, reference()}
def schedule(scheduler_pid \\ __MODULE__, celery_script) def schedule(table \\ @table_name, celery_script)
def schedule(sch, %AST{} = ast) do def schedule(table, %AST{} = ast) do
schedule(sch, Compiler.compile(ast)) schedule(table, Compiler.compile(ast))
end end
def schedule(sch, compiled) when is_list(compiled) do def schedule(table, compiled) when is_list(compiled) do
GenServer.call(sch, {:schedule, compiled})
end
@impl true
def init(_args) do
{:ok, %State{}}
end
@impl true
def handle_call({:execute, compiled}, {_pid, ref} = from, state) do
# Warning, timestamps may be unstable in offline situations. # Warning, timestamps may be unstable in offline situations.
send(self(), :timeout) ref = make_ref()
{:reply, {:ok, ref}, %{state | steps: [{from, :os.system_time(), compiled} | state.steps]}} :ets.insert(table, {nil, {self(), ref}, compiled})
end {:ok, ref}
def handle_call({:schedule, compiled}, {_pid, ref} = from, state) do
send(self(), :timeout)
{:reply, {:ok, ref}, %{state | steps: state.steps ++ [{from, nil, compiled}]}}
end end
@impl true @impl true
def handle_info(:timeout, %{steps: steps} = state) when length(steps) >= 1 do def init(args) do
[{{_pid, _ref} = from, timestamp, compiled} | rest] = table = Keyword.get(args, :table, @table_name)
Enum.sort(steps, fn {:ok, execute} = CommandRunner.start_link(args)
{_, first_ts, _}, {_, second_ts, _} when first_ts <= second_ts -> true {:ok, schedule} = CommandRunner.start_link(args)
{_, _, _}, {_, _, _} -> false send(self(), :checkup)
end) execute_spec = nil
schedule_spec = nil
case state.execute do {:ok,
true -> %State{
{:noreply, state} table: table,
execute: execute,
false -> schedule: schedule,
{:noreply, %{state | execute: is_number(timestamp), steps: rest}, execute_spec: execute_spec,
{:continue, {from, compiled}}} schedule_spec: schedule_spec
end }}
end
def handle_info(:timeout, %{steps: []} = state) do
{:noreply, state}
end end
@impl true @impl true
def handle_continue({{pid, ref} = from, [step | rest]}, state) do def handle_info(:checkup, state) do
case step(state, step) do # execute_steps = :ets.select(state.table, fn
[fun | _] = more when is_function(fun, 0) -> # {head, from, compiled} when is_number(head) -> {head, from, compiled}
{:noreply, state, {:continue, {from, more ++ rest}}} # end)
execute_steps =
:ets.select(state.table, [
{{:"$1", :"$2", :"$3"}, [is_number: :"$1"], [{{:"$1", :"$2", :"$3"}}]}
])
{:error, reason} -> # schedule_steps = :ets.select(state.table, fn
send(pid, {__MODULE__, ref, {:error, reason}}) # {head, from, compiled} when is_nil(head) -> {head, from, compiled}
send(self(), :timeout) # end)
schedule_steps =
:ets.select(state.table, [
{{:"$1", :"$2", :"$3"}, [{:==, :"$1", nil}], [{{:"$1", :"$2", :"$3"}}]}
])
# all = :ets.match_object(state.table, {:_, :_, :_})
# length(all) > 0 && IO.inspect(all, label: "ALL")
length(execute_steps) > 0 && IO.inspect(execute_steps, label: "EXECUTE")
length(schedule_steps) > 0 && IO.inspect(schedule_steps, label: "SCHEDULE")
:ok = GenServer.cast(state.execute, execute_steps)
:ok = GenServer.cast(state.schedule, schedule_steps)
for step <- execute_steps ++ schedule_steps do
true = :ets.delete_object(state.table, step)
end
send(self(), :checkup)
{:noreply, state} {:noreply, state}
_ ->
{:noreply, state, {:continue, {from, rest}}}
end
end
def handle_continue({{pid, ref}, []}, state) do
send(pid, {__MODULE__, ref, :ok})
send(self(), :timeout)
{:noreply, %{state | execute: false}}
end
def step(_state, fun) when is_function(fun, 0) do
try do
fun.()
rescue
e in RuntimeError -> {:error, Exception.message(e)}
exception -> reraise(exception, __STACKTRACE__)
end
end end
end end

View File

@ -0,0 +1,66 @@
defmodule FarmbotCeleryScript.Scheduler.CommandRunner do
@moduledoc false
alias FarmbotCeleryScript.RuntimeError
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
def init(_args) do
send(self(), :checkup)
{:ok, []}
end
def handle_cast(steps, state) do
{:noreply, state ++ steps}
end
def handle_info(:checkup, state) do
# IO.puts "[#{inspect(self())}] CommandRunner checkup"
{:noreply, [], {:continue, state}}
end
def handle_continue([{_timestamp, {pid, ref}, compiled} | rest], state) do
case step_through(compiled) do
:ok ->
send(pid, {FarmbotCeleryScript.Scheduler, ref, :ok})
{:noreply, state, {:continue, rest}}
{:error, reason} ->
send(pid, {FarmbotCeleryScript.Scheduler, ref, {:error, reason}})
{:noreply, state, {:continue, rest}}
end
end
def handle_continue([], state) do
# IO.puts "[#{inspect(self())}] CommandRunner complete"
send(self(), :checkup)
{:noreply, state}
end
defp step_through([fun | rest]) do
case step(fun) do
[fun | _] = more when is_function(fun, 0) ->
step_through(more ++ rest)
{:error, reason} ->
{:error, reason}
_ ->
step_through(rest)
end
end
defp step_through([]), do: :ok
def step(fun) when is_function(fun, 0) do
try do
IO.inspect(fun, label: "step")
fun.()
rescue
e in RuntimeError -> {:error, Exception.message(e)}
exception -> reraise(exception, __STACKTRACE__)
end
end
end

View File

@ -5,11 +5,13 @@ defmodule FarmbotCeleryScript.SchedulerTest do
setup do setup do
{:ok, shim} = TestSysCalls.checkout() {:ok, shim} = TestSysCalls.checkout()
{:ok, sch} = Scheduler.start_link([], []) table = :"celery_scheduler_#{:rand.uniform(100)}"
[shim: shim, sch: sch] table = :ets.new(table, [:duplicate_bag, :named_table, :public])
{:ok, sch} = Scheduler.start_link([table: table], [])
[shim: shim, sch: sch, table: table]
end end
test "uses default values when no parameter is found", %{sch: sch} do test "uses default values when no parameter is found", %{table: sch} do
sequence_ast = sequence_ast =
%{ %{
kind: :sequence, kind: :sequence,
@ -69,7 +71,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do
assert_receive {:move_absolute, [129, 129, 129, 921]} assert_receive {:move_absolute, [129, 129, 129, 921]}
end end
test "syscall errors", %{sch: sch} do test "syscall errors", %{table: sch} do
execute_ast = execute_ast =
%{ %{
kind: :rpc_request, kind: :rpc_request,
@ -92,7 +94,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do
end end
@tag :annoying @tag :annoying
test "regular exceptions still occur", %{sch: sch} do test "regular exceptions still occur", %{table: table, sch: sch} do
Process.flag(:trap_exit, true) Process.flag(:trap_exit, true)
execute_ast = execute_ast =
@ -116,12 +118,12 @@ defmodule FarmbotCeleryScript.SchedulerTest do
:read_pin, _ -> raise("failed to read pin!") :read_pin, _ -> raise("failed to read pin!")
end) end)
{:ok, execute_ref} = Scheduler.execute(sch, executed) {:ok, execute_ref} = Scheduler.execute(table, executed)
refute_receive {Scheduler, ^execute_ref, {:error, "failed to read pin!"}} refute_receive {Scheduler, ^execute_ref, {:error, "failed to read pin!"}}
assert_receive {:EXIT, ^sch, _}, 1000 assert_receive {:EXIT, ^sch, _}, 1000
end end
test "executing a sequence on top of a scheduled sequence", %{sch: sch} do test "executing a sequence on top of a scheduled sequence", %{table: sch} do
scheduled_ast = scheduled_ast =
%{ %{
kind: :sequence, kind: :sequence,
@ -177,7 +179,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do
assert [^time_1, ^time_3, ^time_2] = Enum.sort([time_1, time_2, time_3], &(&1 <= &2)) assert [^time_1, ^time_3, ^time_2] = Enum.sort([time_1, time_2, time_3], &(&1 <= &2))
end end
test "execute twice", %{sch: sch} do test "execute twice", %{table: sch} do
execute_ast_1 = execute_ast_1 =
%{ %{
kind: :rpc_request, kind: :rpc_request,
@ -235,7 +237,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do
assert time_2 >= time_1 + 1000 assert time_2 >= time_1 + 1000
end end
test "execute then schedule", %{sch: sch} do test "execute then schedule", %{table: sch} do
execute_ast_1 = execute_ast_1 =
%{ %{
kind: :rpc_request, kind: :rpc_request,
@ -294,7 +296,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do
assert time_2 >= time_1 + 1000 assert time_2 >= time_1 + 1000
end end
test "schedule and execute simultaneously", %{sch: sch} do test "schedule and execute simultaneously", %{table: sch} do
schedule_ast_1 = schedule_ast_1 =
%{ %{
kind: :sequence, kind: :sequence,
@ -323,8 +325,8 @@ defmodule FarmbotCeleryScript.SchedulerTest do
:ok = :ok =
TestSysCalls.handle(TestSysCalls, fn TestSysCalls.handle(TestSysCalls, fn
:wait, [millis] -> :wait, [millis] ->
send(pid, {:wait, :os.system_time()})
Process.sleep(millis) Process.sleep(millis)
send(pid, {:wait, :os.system_time()})
:read_pin, _ -> :read_pin, _ ->
send(pid, {:read_pin, :os.system_time()}) send(pid, {:read_pin, :os.system_time()})
@ -354,6 +356,6 @@ defmodule FarmbotCeleryScript.SchedulerTest do
assert_receive {:read_pin, time_2} assert_receive {:read_pin, time_2}
# Assert that the read pin executed and finished before the wait. # Assert that the read pin executed and finished before the wait.
assert time_2 <= time_1 + 2500 assert time_2 <= time_1
end end
end end