farmbot_os/farmbot_celery_script/lib/farmbot_celery_script/scheduler.ex

357 lines
9.4 KiB
Elixir

defmodule FarmbotCeleryScript.Scheduler do
@moduledoc """
Handles execution of CeleryScript.
CeleryScript can be `execute`d or `schedule`d. Both have the same API but
slightly different behaviour.
A message will arrive in the callers inbox after either shaped like
{FarmbotCeleryScript.Scheduler, result}
where result will be
:ok | {:error, "some string error"}
The Scheduler makes no effort to rescue bad syscall implementations. See
the docs foro SysCalls for more details.
"""
use GenServer
require Logger
alias FarmbotCeleryScript.{AST, Compiler, Scheduler, StepRunner}
alias Scheduler, as: State
# 15 minutes
@grace_period_ms 900_000
defmodule Dispatch do
defstruct [
:scheduled_at,
:data
]
end
defstruct next: nil,
checkup_timer: nil,
scheduled_pid: nil,
compiled: [],
monitors: [],
registry_name: nil
@type compiled_ast() :: [(() -> any)]
@type state :: %State{
next: nil | {compiled_ast(), DateTime.t(), data :: map(), pid},
checkup_timer: nil | reference(),
scheduled_pid: nil | pid(),
compiled: [{compiled_ast(), DateTime.t(), data :: map(), pid}],
monitors: [GenServer.from()],
registry_name: GenServer.server()
}
@doc "Start an instance of a CeleryScript Scheduler"
def start_link(args, opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, args, opts)
end
def register(sch \\ __MODULE__) do
state = :sys.get_state(sch)
{:ok, _} = Registry.register(state.registry_name, :dispatch, self())
dispatch(state)
:ok
end
@doc """
Schedule CeleryScript to execute whenever there is time for it.
Calls are executed in a first in first out buffer, with things being added
by `execute/2` taking priority.
"""
@spec schedule(
GenServer.server(),
AST.t() | [Compiler.compiled()],
DateTime.t(),
map()
) ::
{:ok, reference()}
def schedule(scheduler_pid \\ __MODULE__, celery_script, at, data)
def schedule(sch, %AST{} = ast, %DateTime{} = at, %{} = data) do
schedule(sch, Compiler.compile(ast), at, data)
end
def schedule(sch, compiled, at, %{} = data) when is_list(compiled) do
GenServer.call(sch, {:schedule, compiled, at, data})
end
def get_next(sch \\ __MODULE__) do
GenServer.call(sch, :get_next)
end
def get_next_from_now(sch \\ __MODULE__) do
case get_next_at(sch) do
nil -> nil
at -> Timex.from_now(at)
end
end
def get_next_at(sch \\ __MODULE__) do
case get_next(sch) do
nil ->
nil
{_compiled, at, _data, _pid} ->
at
end
end
@impl true
def init(args) do
registry_name = Keyword.get(args, :registry_name, Scheduler.Registry)
{:ok, _} = Registry.start_link(keys: :duplicate, name: registry_name)
send(self(), :checkup)
{:ok, %State{registry_name: registry_name}}
end
@impl true
def handle_call({:schedule, compiled, at, data}, {pid, ref} = from, state) do
state =
state
|> monitor(pid)
|> add(compiled, at, data, pid)
:ok = GenServer.reply(from, {:ok, ref})
{:noreply, state}
end
def handle_call(:get_next, _from, state) do
{:reply, state.next, state}
end
@impl true
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
Logger.debug("Scheduler monitor down: #{inspect(pid)}")
state =
state
|> demonitor({pid, ref})
|> delete(pid)
{:noreply, state}
end
def handle_info(:checkup, %{next: nil} = state) do
# Logger.debug("Scheduling next checkup with no next")
state
|> schedule_next_checkup()
|> dispatch()
end
def handle_info(:checkup, %{next: {_compiled, at, _data, _pid}} = state) do
case DateTime.diff(DateTime.utc_now(), at, :millisecond) do
# now is before the next date
diff_ms when diff_ms < 0 ->
# from_now =
# DateTime.utc_now()
# |> DateTime.add(abs(diff_ms), :millisecond)
# |> Timex.from_now()
# msg = "Next execution is still #{diff_ms}ms too early (#{from_now})"
# Logger.info(msg)
state
|> schedule_next_checkup(abs(diff_ms))
|> dispatch()
# now is more than the grace period past schedule time
diff_ms when diff_ms > @grace_period_ms ->
# from_now = Timex.from_now(at)
# Logger.info("Next execution is #{diff_ms}ms too late (#{from_now})")
state
|> pop_next()
|> index_next()
|> schedule_next_checkup()
|> dispatch()
# now is late, but less than the grace period late
diff_ms when diff_ms >= 0 when diff_ms <= @grace_period_ms ->
Logger.info(
"Next execution is ready for execution: #{Timex.from_now(at)}"
)
state
|> execute_next()
|> dispatch()
end
end
def handle_info(
{:step_complete, {scheduled_at, executed_at, pid}, result},
state
) do
send(
pid,
{FarmbotCeleryScript,
{:scheduled_execution, scheduled_at, executed_at, result}}
)
state
|> pop_next()
|> index_next()
|> schedule_next_checkup()
|> dispatch()
end
@spec execute_next(state()) :: state()
defp execute_next(%{next: {compiled, at, _data, pid}} = state) do
scheduler_pid = self()
scheduled_pid =
spawn(fn ->
StepRunner.step(scheduler_pid, {at, DateTime.utc_now(), pid}, compiled)
end)
%{state | scheduled_pid: scheduled_pid}
end
@spec schedule_next_checkup(state(), :default | integer) :: state()
defp schedule_next_checkup(state, offset_ms \\ :default)
defp schedule_next_checkup(%{checkup_timer: timer} = state, offset_ms)
when is_reference(timer) do
# Logger.debug("canceling checkup timer")
Process.cancel_timer(timer)
schedule_next_checkup(%{state | checkup_timer: nil}, offset_ms)
end
defp schedule_next_checkup(state, :default) do
# Logger.debug("Scheduling next checkup in 15 seconds")
checkup_timer = Process.send_after(self(), :checkup, 15_000)
%{state | checkup_timer: checkup_timer}
end
# If the offset is less than a minute, there will be so little skew that
# it won't be noticed. This speeds up execution and gets it to pretty
# close to millisecond accuracy
defp schedule_next_checkup(state, offset_ms) when offset_ms <= 60000 do
_ = inspect(offset_ms)
# Logger.debug("Scheduling next checkup in #{offset_ms} seconds")
checkup_timer = Process.send_after(self(), :checkup, offset_ms)
%{state | checkup_timer: checkup_timer}
end
defp schedule_next_checkup(state, offset_ms) do
_ = inspect(offset_ms)
# Logger.debug("Scheduling next checkup in 15 seconds (#{offset_ms})")
checkup_timer = Process.send_after(self(), :checkup, 15_000)
%{state | checkup_timer: checkup_timer}
end
@spec index_next(state()) :: state()
defp index_next(%{compiled: []} = state), do: %{state | next: nil}
defp index_next(state) do
[next | _] =
compiled =
Enum.sort(state.compiled, fn
{_, at, _, _}, {_, at, _, _} ->
true
{_, left, _, _}, {_, right, _, _} ->
DateTime.compare(left, right) == :lt
end)
%{state | next: next, compiled: compiled}
end
@spec pop_next(state()) :: state()
defp pop_next(%{compiled: [_ | compiled]} = state) do
%{state | compiled: compiled, scheduled_pid: nil}
end
defp pop_next(%{compiled: []} = state) do
%{state | compiled: [], scheduled_pid: nil}
end
@spec monitor(state(), pid()) :: state()
defp monitor(state, pid) do
already_monitored? =
Enum.find(state.monitors, fn
{^pid, _ref} ->
true
_ ->
false
end)
if already_monitored? do
state
else
ref = Process.monitor(pid)
%{state | monitors: [{pid, ref} | state.monitors]}
end
end
@spec demonitor(state(), GenServer.from()) :: state()
defp demonitor(state, {pid, ref}) do
monitors =
Enum.reject(state.monitors, fn
{^pid, ^ref} ->
true
{_pid, _ref} ->
false
end)
%{state | monitors: monitors}
end
@spec add(state(), compiled_ast(), DateTime.t(), data :: map(), pid()) ::
state()
defp add(state, compiled, at, data, pid) do
%{state | compiled: [{compiled, at, data, pid} | state.compiled]}
|> index_next()
end
@spec delete(state(), pid()) :: state()
defp delete(state, pid) do
compiled =
Enum.reject(state.compiled, fn
{_compiled, _at, _data, ^pid} -> true
{_compiled, _at, _data, _pid} -> false
end)
%{state | compiled: compiled}
|> index_next()
end
defp dispatch(%{registry_name: name, compiled: compiled} = state) do
calendar =
Enum.map(compiled, fn
{_compiled, scheduled_at, data, _pid} ->
%Dispatch{data: data, scheduled_at: scheduled_at}
end)
Registry.dispatch(name, :dispatch, fn entries ->
for {pid, _} <- entries do
do_dispatch(name, pid, calendar)
end
end)
{:noreply, state}
end
defp do_dispatch(name, pid, calendar) do
case Registry.meta(name, {:last_calendar, pid}) do
{:ok, ^calendar} ->
Logger.debug("calendar for #{inspect(pid)} hasn't changed")
{FarmbotCeleryScript, {:calendar, calendar}}
_old_calendar ->
Registry.put_meta(name, {:last_calendar, pid}, calendar)
send(pid, {FarmbotCeleryScript, {:calendar, calendar}})
end
end
end