diff --git a/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex b/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex index 91f652c6..817cfe17 100644 --- a/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex +++ b/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex @@ -19,10 +19,17 @@ defmodule FarmbotCeleryScript.Scheduler do use GenServer alias __MODULE__, as: State - alias FarmbotCeleryScript.{AST, RuntimeError, Compiler} + alias FarmbotCeleryScript.Scheduler.CommandRunner + alias FarmbotCeleryScript.{AST, Compiler} defstruct steps: [], - execute: false + table: nil, + execute: nil, + schedule: nil, + execute_spec: nil, + schedule_spec: nil + + @table_name :celery_scheduler @doc "Start an instance of a CeleryScript Scheduler" def start_link(args, opts \\ [name: __MODULE__]) do @@ -42,15 +49,17 @@ defmodule FarmbotCeleryScript.Scheduler do `move_absolute` at the same time, the `execute`d call will have somewhat undefined behaviour depending on the `move_absolute` implementation. """ - @spec execute(GenServer.server(), AST.t() | [Compiler.compiled()]) :: {:ok, reference()} - def execute(scheduler_pid \\ __MODULE__, celery_script) + @spec execute(atom, AST.t() | [Compiler.compiled()]) :: {:ok, reference()} + def execute(table \\ @table_name, celery_script) - def execute(sch, %AST{} = ast) do - execute(sch, Compiler.compile(ast)) + def execute(table, %AST{} = ast) do + execute(table, Compiler.compile(ast)) end - def execute(sch, compiled) when is_list(compiled) do - GenServer.call(sch, {:execute, compiled}) + def execute(table, compiled) when is_list(compiled) do + ref = make_ref() + :ets.insert(table, {:os.system_time(), {self(), ref}, compiled}) + {:ok, ref} end @doc """ @@ -58,84 +67,71 @@ defmodule FarmbotCeleryScript.Scheduler do Calls are executed in a first in first out buffer, with things being added by `execute/2` taking priority. """ - @spec schedule(GenServer.server(), AST.t() | [Compiler.compiled()]) :: {:ok, reference()} - def schedule(scheduler_pid \\ __MODULE__, celery_script) + @spec schedule(atom, AST.t() | [Compiler.compiled()]) :: {:ok, reference()} + def schedule(table \\ @table_name, celery_script) - def schedule(sch, %AST{} = ast) do - schedule(sch, Compiler.compile(ast)) + def schedule(table, %AST{} = ast) do + schedule(table, Compiler.compile(ast)) end - def schedule(sch, compiled) when is_list(compiled) do - GenServer.call(sch, {:schedule, compiled}) - end - - @impl true - def init(_args) do - {:ok, %State{}} - end - - @impl true - def handle_call({:execute, compiled}, {_pid, ref} = from, state) do + def schedule(table, compiled) when is_list(compiled) do # Warning, timestamps may be unstable in offline situations. - send(self(), :timeout) - {:reply, {:ok, ref}, %{state | steps: [{from, :os.system_time(), compiled} | state.steps]}} - end - - def handle_call({:schedule, compiled}, {_pid, ref} = from, state) do - send(self(), :timeout) - {:reply, {:ok, ref}, %{state | steps: state.steps ++ [{from, nil, compiled}]}} + ref = make_ref() + :ets.insert(table, {nil, {self(), ref}, compiled}) + {:ok, ref} end @impl true - def handle_info(:timeout, %{steps: steps} = state) when length(steps) >= 1 do - [{{_pid, _ref} = from, timestamp, compiled} | rest] = - Enum.sort(steps, fn - {_, first_ts, _}, {_, second_ts, _} when first_ts <= second_ts -> true - {_, _, _}, {_, _, _} -> false - end) + def init(args) do + table = Keyword.get(args, :table, @table_name) + {:ok, execute} = CommandRunner.start_link(args) + {:ok, schedule} = CommandRunner.start_link(args) + send(self(), :checkup) + execute_spec = nil + schedule_spec = nil - case state.execute do - true -> - {:noreply, state} - - false -> - {:noreply, %{state | execute: is_number(timestamp), steps: rest}, - {:continue, {from, compiled}}} - end + {:ok, + %State{ + table: table, + execute: execute, + schedule: schedule, + execute_spec: execute_spec, + schedule_spec: schedule_spec + }} end - def handle_info(:timeout, %{steps: []} = state) do + @impl true + def handle_info(:checkup, state) do + # execute_steps = :ets.select(state.table, fn + # {head, from, compiled} when is_number(head) -> {head, from, compiled} + # end) + execute_steps = + :ets.select(state.table, [ + {{:"$1", :"$2", :"$3"}, [is_number: :"$1"], [{{:"$1", :"$2", :"$3"}}]} + ]) + + # schedule_steps = :ets.select(state.table, fn + # {head, from, compiled} when is_nil(head) -> {head, from, compiled} + # end) + + schedule_steps = + :ets.select(state.table, [ + {{:"$1", :"$2", :"$3"}, [{:==, :"$1", nil}], [{{:"$1", :"$2", :"$3"}}]} + ]) + + # all = :ets.match_object(state.table, {:_, :_, :_}) + # length(all) > 0 && IO.inspect(all, label: "ALL") + + length(execute_steps) > 0 && IO.inspect(execute_steps, label: "EXECUTE") + length(schedule_steps) > 0 && IO.inspect(schedule_steps, label: "SCHEDULE") + :ok = GenServer.cast(state.execute, execute_steps) + :ok = GenServer.cast(state.schedule, schedule_steps) + + for step <- execute_steps ++ schedule_steps do + true = :ets.delete_object(state.table, step) + end + + send(self(), :checkup) {:noreply, state} end - - @impl true - def handle_continue({{pid, ref} = from, [step | rest]}, state) do - case step(state, step) do - [fun | _] = more when is_function(fun, 0) -> - {:noreply, state, {:continue, {from, more ++ rest}}} - - {:error, reason} -> - send(pid, {__MODULE__, ref, {:error, reason}}) - send(self(), :timeout) - {:noreply, state} - - _ -> - {:noreply, state, {:continue, {from, rest}}} - end - end - - def handle_continue({{pid, ref}, []}, state) do - send(pid, {__MODULE__, ref, :ok}) - send(self(), :timeout) - {:noreply, %{state | execute: false}} - end - - def step(_state, fun) when is_function(fun, 0) do - try do - fun.() - rescue - e in RuntimeError -> {:error, Exception.message(e)} - exception -> reraise(exception, __STACKTRACE__) - end - end end diff --git a/farmbot_celery_script/lib/farmbot_celery_script/scheduler/command_runner.ex b/farmbot_celery_script/lib/farmbot_celery_script/scheduler/command_runner.ex new file mode 100644 index 00000000..11973d52 --- /dev/null +++ b/farmbot_celery_script/lib/farmbot_celery_script/scheduler/command_runner.ex @@ -0,0 +1,66 @@ +defmodule FarmbotCeleryScript.Scheduler.CommandRunner do + @moduledoc false + alias FarmbotCeleryScript.RuntimeError + use GenServer + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end + + def init(_args) do + send(self(), :checkup) + {:ok, []} + end + + def handle_cast(steps, state) do + {:noreply, state ++ steps} + end + + def handle_info(:checkup, state) do + # IO.puts "[#{inspect(self())}] CommandRunner checkup" + {:noreply, [], {:continue, state}} + end + + def handle_continue([{_timestamp, {pid, ref}, compiled} | rest], state) do + case step_through(compiled) do + :ok -> + send(pid, {FarmbotCeleryScript.Scheduler, ref, :ok}) + {:noreply, state, {:continue, rest}} + + {:error, reason} -> + send(pid, {FarmbotCeleryScript.Scheduler, ref, {:error, reason}}) + {:noreply, state, {:continue, rest}} + end + end + + def handle_continue([], state) do + # IO.puts "[#{inspect(self())}] CommandRunner complete" + send(self(), :checkup) + {:noreply, state} + end + + defp step_through([fun | rest]) do + case step(fun) do + [fun | _] = more when is_function(fun, 0) -> + step_through(more ++ rest) + + {:error, reason} -> + {:error, reason} + + _ -> + step_through(rest) + end + end + + defp step_through([]), do: :ok + + def step(fun) when is_function(fun, 0) do + try do + IO.inspect(fun, label: "step") + fun.() + rescue + e in RuntimeError -> {:error, Exception.message(e)} + exception -> reraise(exception, __STACKTRACE__) + end + end +end diff --git a/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs b/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs index c363f8b0..bfccda70 100644 --- a/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs +++ b/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs @@ -5,11 +5,13 @@ defmodule FarmbotCeleryScript.SchedulerTest do setup do {:ok, shim} = TestSysCalls.checkout() - {:ok, sch} = Scheduler.start_link([], []) - [shim: shim, sch: sch] + table = :"celery_scheduler_#{:rand.uniform(100)}" + table = :ets.new(table, [:duplicate_bag, :named_table, :public]) + {:ok, sch} = Scheduler.start_link([table: table], []) + [shim: shim, sch: sch, table: table] end - test "uses default values when no parameter is found", %{sch: sch} do + test "uses default values when no parameter is found", %{table: sch} do sequence_ast = %{ kind: :sequence, @@ -69,7 +71,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do assert_receive {:move_absolute, [129, 129, 129, 921]} end - test "syscall errors", %{sch: sch} do + test "syscall errors", %{table: sch} do execute_ast = %{ kind: :rpc_request, @@ -92,7 +94,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do end @tag :annoying - test "regular exceptions still occur", %{sch: sch} do + test "regular exceptions still occur", %{table: table, sch: sch} do Process.flag(:trap_exit, true) execute_ast = @@ -116,12 +118,12 @@ defmodule FarmbotCeleryScript.SchedulerTest do :read_pin, _ -> raise("failed to read pin!") end) - {:ok, execute_ref} = Scheduler.execute(sch, executed) + {:ok, execute_ref} = Scheduler.execute(table, executed) refute_receive {Scheduler, ^execute_ref, {:error, "failed to read pin!"}} assert_receive {:EXIT, ^sch, _}, 1000 end - test "executing a sequence on top of a scheduled sequence", %{sch: sch} do + test "executing a sequence on top of a scheduled sequence", %{table: sch} do scheduled_ast = %{ kind: :sequence, @@ -177,7 +179,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do assert [^time_1, ^time_3, ^time_2] = Enum.sort([time_1, time_2, time_3], &(&1 <= &2)) end - test "execute twice", %{sch: sch} do + test "execute twice", %{table: sch} do execute_ast_1 = %{ kind: :rpc_request, @@ -235,7 +237,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do assert time_2 >= time_1 + 1000 end - test "execute then schedule", %{sch: sch} do + test "execute then schedule", %{table: sch} do execute_ast_1 = %{ kind: :rpc_request, @@ -294,7 +296,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do assert time_2 >= time_1 + 1000 end - test "schedule and execute simultaneously", %{sch: sch} do + test "schedule and execute simultaneously", %{table: sch} do schedule_ast_1 = %{ kind: :sequence, @@ -323,8 +325,8 @@ defmodule FarmbotCeleryScript.SchedulerTest do :ok = TestSysCalls.handle(TestSysCalls, fn :wait, [millis] -> - send(pid, {:wait, :os.system_time()}) Process.sleep(millis) + send(pid, {:wait, :os.system_time()}) :read_pin, _ -> send(pid, {:read_pin, :os.system_time()}) @@ -354,6 +356,6 @@ defmodule FarmbotCeleryScript.SchedulerTest do assert_receive {:read_pin, time_2} # Assert that the read pin executed and finished before the wait. - assert time_2 <= time_1 + 2500 + assert time_2 <= time_1 end end