Fix scheduler not reindexing when an event is missed

* Add some breadcrumbs to see what the data that generated the scheduled
event looks like
* only monitor a process if it isn't monitored yet
pull/974/head
Connor Rigby 2019-07-10 12:07:47 -07:00
parent de6b20b828
commit 3d07c263ea
No known key found for this signature in database
GPG Key ID: 29A88B24B70456E0
5 changed files with 66 additions and 33 deletions

View File

@ -6,8 +6,8 @@ defmodule FarmbotCeleryScript do
alias FarmbotCeleryScript.{AST, Scheduler, StepRunner}
@doc "Schedule an AST to execute on a DateTime"
def schedule(%AST{} = ast, %DateTime{} = at) do
Scheduler.schedule(ast, at)
def schedule(%AST{} = ast, %DateTime{} = at, %{} = data) do
Scheduler.schedule(ast, at, data)
end
@doc "Execute an AST in place"

View File

@ -38,22 +38,39 @@ defmodule FarmbotCeleryScript.Scheduler do
Calls are executed in a first in first out buffer, with things being added
by `execute/2` taking priority.
"""
@spec schedule(GenServer.server(), AST.t() | [Compiler.compiled()], DateTime.t()) ::
@spec schedule(GenServer.server(), AST.t() | [Compiler.compiled()], DateTime.t(), map()) ::
{:ok, reference()}
def schedule(scheduler_pid \\ __MODULE__, celery_script, at)
def schedule(scheduler_pid \\ __MODULE__, celery_script, at, data)
def schedule(sch, %AST{} = ast, %DateTime{} = at) do
schedule(sch, Compiler.compile(ast), at)
def schedule(sch, %AST{} = ast, %DateTime{} = at, %{} = data) do
schedule(sch, Compiler.compile(ast), at, data)
end
def schedule(sch, compiled, at) when is_list(compiled) do
GenServer.call(sch, {:schedule, compiled, at})
def schedule(sch, compiled, at, %{} = data) when is_list(compiled) do
GenServer.call(sch, {:schedule, compiled, at, data})
end
def get_next(sch \\ __MODULE__) do
GenServer.call(sch, :get_next)
end
def get_next_from_now(sch \\ __MODULE__) do
case get_next_at(sch) do
nil -> nil
at -> Timex.from_now(at)
end
end
def get_next_at(sch \\ __MODULE__) do
case get_next(sch) do
nil ->
nil
{_compiled, at, _data, _pid} ->
at
end
end
@impl true
def init(_args) do
send(self(), :checkup)
@ -61,11 +78,11 @@ defmodule FarmbotCeleryScript.Scheduler do
end
@impl true
def handle_call({:schedule, compiled, at}, {pid, ref} = from, state) do
def handle_call({:schedule, compiled, at, data}, {pid, ref} = from, state) do
state =
state
|> monitor(pid)
|> add(compiled, at, pid)
|> add(compiled, at, data, pid)
:ok = GenServer.reply(from, {:ok, ref})
schedule_next_checkup(state, 0)
@ -91,7 +108,7 @@ defmodule FarmbotCeleryScript.Scheduler do
schedule_next_checkup(state)
end
def handle_info(:checkup, %{next: {_compiled, at, _pid}} = state) do
def handle_info(:checkup, %{next: {_compiled, at, _data, _pid}} = state) do
case DateTime.diff(DateTime.utc_now(), at, :millisecond) do
# now is before the next date
diff when diff < 0 ->
@ -100,22 +117,22 @@ defmodule FarmbotCeleryScript.Scheduler do
|> DateTime.add(abs(diff), :millisecond)
|> Timex.from_now()
Logger.info("Next execution is still #{abs(diff)}ms too early (#{from_now})")
Logger.info("Next execution is still #{diff}ms too early (#{from_now})")
schedule_next_checkup(state, abs(diff))
# now is more than 2 minutes (120 seconds) past schedule time
diff when diff > 120_000 ->
from_now =
DateTime.utc_now()
|> DateTime.add(diff, :millisecond)
|> Timex.from_now()
from_now = Timex.from_now(at)
Logger.info("Next execution is #{diff}ms too late (#{from_now})")
Logger.info("Next execution is #{abs(diff)}ms too late (#{from_now})")
schedule_next_checkup(state)
state
|> pop_next()
|> index_next()
|> schedule_next_checkup()
# now is late, but less than 2 minutes late
diff when diff >= 0 when diff <= 120_000 ->
Logger.info("Next execution is ready for execution")
Logger.info("Next execution is ready for execution: #{Timex.from_now(at)}")
execute_next(state)
end
end
@ -129,7 +146,7 @@ defmodule FarmbotCeleryScript.Scheduler do
|> schedule_next_checkup()
end
defp execute_next(%{next: {compiled, at, pid}} = state) do
defp execute_next(%{next: {compiled, at, _data, pid}} = state) do
scheduler_pid = self()
scheduled_pid =
@ -174,10 +191,10 @@ defmodule FarmbotCeleryScript.Scheduler do
[next | _] =
compiled =
Enum.sort(state.compiled, fn
{_, at, _}, {_, at, _} ->
{_, at, _, _}, {_, at, _, _} ->
true
{_, left, _}, {_, right, _} ->
{_, left, _, _}, {_, right, _, _} ->
DateTime.compare(left, right) == :lt
end)
@ -193,30 +210,46 @@ defmodule FarmbotCeleryScript.Scheduler do
end
defp monitor(state, pid) do
ref = Process.monitor(pid)
%{state | monitors: [{pid, ref} | state.monitors]}
already_monitored? =
Enum.find(state.monitors, fn
{^pid, _ref} ->
true
_ ->
false
end)
if already_monitored? do
state
else
ref = Process.monitor(pid)
%{state | monitors: [{pid, ref} | state.monitors]}
end
end
defp demonitor(state, {pid, ref}) do
monitors =
Enum.reject(state.monitors, fn
{^pid, ^ref} -> true
{_pid, _ref} -> false
{^pid, ^ref} ->
true
{_pid, _ref} ->
false
end)
%{state | monitors: monitors}
end
defp add(state, compiled, at, pid) do
%{state | compiled: [{compiled, at, pid} | state.compiled]}
defp add(state, compiled, at, data, pid) do
%{state | compiled: [{compiled, at, data, pid} | state.compiled]}
|> index_next()
end
defp delete(state, pid) do
compiled =
Enum.reject(state.compiled, fn
{_compiled, _at, ^pid} -> true
{_compiled, _at, _pid} -> false
{_compiled, _at, _data, ^pid} -> true
{_compiled, _at, _data, _pid} -> false
end)
%{state | compiled: compiled}

View File

@ -25,7 +25,7 @@ defmodule FarmbotCeleryScript.SchedulerTest do
end)
scheduled_time = DateTime.utc_now() |> DateTime.add(100, :millisecond)
{:ok, _} = Scheduler.schedule(sch, ast, scheduled_time)
{:ok, _} = Scheduler.schedule(sch, ast, scheduled_time, %{})
assert_receive {:read_pin, [9, 0]}, 1000
end
end

View File

@ -54,6 +54,6 @@ defmodule FarmbotCore.FarmEventWorker.SequenceEvent do
| locals: %{celery_ast.args.locals | body: celery_ast.args.locals.body ++ param_appls}
}
}
FarmbotCeleryScript.schedule(celery_ast, at)
FarmbotCeleryScript.schedule(celery_ast, at, farm_event)
end
end

View File

@ -88,6 +88,6 @@ defimpl FarmbotCore.AssetWorker, for: FarmbotCore.Asset.RegimenInstance do
celery_ast.args.locals | body: celery_ast.args.locals.body ++ regimen_params ++ farm_event_params}
}
}
FarmbotCeleryScript.schedule(celery_ast, at)
FarmbotCeleryScript.schedule(celery_ast, at, regimen_instance)
end
end