diff --git a/farmbot_celery_script/lib/farmbot_celery_script.ex b/farmbot_celery_script/lib/farmbot_celery_script.ex index f459306f..4f724a5e 100644 --- a/farmbot_celery_script/lib/farmbot_celery_script.ex +++ b/farmbot_celery_script/lib/farmbot_celery_script.ex @@ -6,8 +6,8 @@ defmodule FarmbotCeleryScript do alias FarmbotCeleryScript.{AST, Scheduler, StepRunner} @doc "Schedule an AST to execute on a DateTime" - def schedule(%AST{} = ast, %DateTime{} = at) do - Scheduler.schedule(ast, at) + def schedule(%AST{} = ast, %DateTime{} = at, %{} = data) do + Scheduler.schedule(ast, at, data) end @doc "Execute an AST in place" diff --git a/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex b/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex index 6310201c..1a72a11e 100644 --- a/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex +++ b/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex @@ -38,22 +38,39 @@ defmodule FarmbotCeleryScript.Scheduler do Calls are executed in a first in first out buffer, with things being added by `execute/2` taking priority. """ - @spec schedule(GenServer.server(), AST.t() | [Compiler.compiled()], DateTime.t()) :: + @spec schedule(GenServer.server(), AST.t() | [Compiler.compiled()], DateTime.t(), map()) :: {:ok, reference()} - def schedule(scheduler_pid \\ __MODULE__, celery_script, at) + def schedule(scheduler_pid \\ __MODULE__, celery_script, at, data) - def schedule(sch, %AST{} = ast, %DateTime{} = at) do - schedule(sch, Compiler.compile(ast), at) + def schedule(sch, %AST{} = ast, %DateTime{} = at, %{} = data) do + schedule(sch, Compiler.compile(ast), at, data) end - def schedule(sch, compiled, at) when is_list(compiled) do - GenServer.call(sch, {:schedule, compiled, at}) + def schedule(sch, compiled, at, %{} = data) when is_list(compiled) do + GenServer.call(sch, {:schedule, compiled, at, data}) end def get_next(sch \\ __MODULE__) do GenServer.call(sch, :get_next) end + def get_next_from_now(sch \\ __MODULE__) do + case get_next_at(sch) do + nil -> nil + at -> Timex.from_now(at) + end + end + + def get_next_at(sch \\ __MODULE__) do + case get_next(sch) do + nil -> + nil + + {_compiled, at, _data, _pid} -> + at + end + end + @impl true def init(_args) do send(self(), :checkup) @@ -61,11 +78,11 @@ defmodule FarmbotCeleryScript.Scheduler do end @impl true - def handle_call({:schedule, compiled, at}, {pid, ref} = from, state) do + def handle_call({:schedule, compiled, at, data}, {pid, ref} = from, state) do state = state |> monitor(pid) - |> add(compiled, at, pid) + |> add(compiled, at, data, pid) :ok = GenServer.reply(from, {:ok, ref}) schedule_next_checkup(state, 0) @@ -91,7 +108,7 @@ defmodule FarmbotCeleryScript.Scheduler do schedule_next_checkup(state) end - def handle_info(:checkup, %{next: {_compiled, at, _pid}} = state) do + def handle_info(:checkup, %{next: {_compiled, at, _data, _pid}} = state) do case DateTime.diff(DateTime.utc_now(), at, :millisecond) do # now is before the next date diff when diff < 0 -> @@ -100,22 +117,22 @@ defmodule FarmbotCeleryScript.Scheduler do |> DateTime.add(abs(diff), :millisecond) |> Timex.from_now() - Logger.info("Next execution is still #{abs(diff)}ms too early (#{from_now})") + Logger.info("Next execution is still #{diff}ms too early (#{from_now})") schedule_next_checkup(state, abs(diff)) # now is more than 2 minutes (120 seconds) past schedule time diff when diff > 120_000 -> - from_now = - DateTime.utc_now() - |> DateTime.add(diff, :millisecond) - |> Timex.from_now() + from_now = Timex.from_now(at) + Logger.info("Next execution is #{diff}ms too late (#{from_now})") - Logger.info("Next execution is #{abs(diff)}ms too late (#{from_now})") - schedule_next_checkup(state) + state + |> pop_next() + |> index_next() + |> schedule_next_checkup() # now is late, but less than 2 minutes late diff when diff >= 0 when diff <= 120_000 -> - Logger.info("Next execution is ready for execution") + Logger.info("Next execution is ready for execution: #{Timex.from_now(at)}") execute_next(state) end end @@ -129,7 +146,7 @@ defmodule FarmbotCeleryScript.Scheduler do |> schedule_next_checkup() end - defp execute_next(%{next: {compiled, at, pid}} = state) do + defp execute_next(%{next: {compiled, at, _data, pid}} = state) do scheduler_pid = self() scheduled_pid = @@ -174,10 +191,10 @@ defmodule FarmbotCeleryScript.Scheduler do [next | _] = compiled = Enum.sort(state.compiled, fn - {_, at, _}, {_, at, _} -> + {_, at, _, _}, {_, at, _, _} -> true - {_, left, _}, {_, right, _} -> + {_, left, _, _}, {_, right, _, _} -> DateTime.compare(left, right) == :lt end) @@ -193,30 +210,46 @@ defmodule FarmbotCeleryScript.Scheduler do end defp monitor(state, pid) do - ref = Process.monitor(pid) - %{state | monitors: [{pid, ref} | state.monitors]} + already_monitored? = + Enum.find(state.monitors, fn + {^pid, _ref} -> + true + + _ -> + false + end) + + if already_monitored? do + state + else + ref = Process.monitor(pid) + %{state | monitors: [{pid, ref} | state.monitors]} + end end defp demonitor(state, {pid, ref}) do monitors = Enum.reject(state.monitors, fn - {^pid, ^ref} -> true - {_pid, _ref} -> false + {^pid, ^ref} -> + true + + {_pid, _ref} -> + false end) %{state | monitors: monitors} end - defp add(state, compiled, at, pid) do - %{state | compiled: [{compiled, at, pid} | state.compiled]} + defp add(state, compiled, at, data, pid) do + %{state | compiled: [{compiled, at, data, pid} | state.compiled]} |> index_next() end defp delete(state, pid) do compiled = Enum.reject(state.compiled, fn - {_compiled, _at, ^pid} -> true - {_compiled, _at, _pid} -> false + {_compiled, _at, _data, ^pid} -> true + {_compiled, _at, _data, _pid} -> false end) %{state | compiled: compiled} diff --git a/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs b/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs index dc24e64c..99dc8eab 100644 --- a/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs +++ b/farmbot_celery_script/test/farmbot_celery_script/scheduler_test.exs @@ -25,7 +25,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do end) scheduled_time = DateTime.utc_now() |> DateTime.add(100, :millisecond) - {:ok, _} = Scheduler.schedule(sch, ast, scheduled_time) + {:ok, _} = Scheduler.schedule(sch, ast, scheduled_time, %{}) assert_receive {:read_pin, [9, 0]}, 1000 end end diff --git a/farmbot_core/lib/farmbot_core/asset_workers/farm_event_worker/sequence_event.ex b/farmbot_core/lib/farmbot_core/asset_workers/farm_event_worker/sequence_event.ex index 275d3bdf..38be5454 100644 --- a/farmbot_core/lib/farmbot_core/asset_workers/farm_event_worker/sequence_event.ex +++ b/farmbot_core/lib/farmbot_core/asset_workers/farm_event_worker/sequence_event.ex @@ -54,6 +54,6 @@ defmodule FarmbotCore.FarmEventWorker.SequenceEvent do | locals: %{celery_ast.args.locals | body: celery_ast.args.locals.body ++ param_appls} } } - FarmbotCeleryScript.schedule(celery_ast, at) + FarmbotCeleryScript.schedule(celery_ast, at, farm_event) end end \ No newline at end of file diff --git a/farmbot_core/lib/farmbot_core/asset_workers/regimen_instance_worker.ex b/farmbot_core/lib/farmbot_core/asset_workers/regimen_instance_worker.ex index 8c107307..2ef3e88e 100644 --- a/farmbot_core/lib/farmbot_core/asset_workers/regimen_instance_worker.ex +++ b/farmbot_core/lib/farmbot_core/asset_workers/regimen_instance_worker.ex @@ -88,6 +88,6 @@ defimpl FarmbotCore.AssetWorker, for: FarmbotCore.Asset.RegimenInstance do celery_ast.args.locals | body: celery_ast.args.locals.body ++ regimen_params ++ farm_event_params} } } - FarmbotCeleryScript.schedule(celery_ast, at) + FarmbotCeleryScript.schedule(celery_ast, at, regimen_instance) end end