Begin implementing new farmevent worker
parent
00d0897672
commit
f1eb3fc49d
|
@ -115,20 +115,17 @@ defmodule FarmbotCore.Asset do
|
|||
|
||||
## Begin RegimenInstance
|
||||
|
||||
def upsert_regimen_instance!(%Regimen{} = regimen, %FarmEvent{} = farm_event, params \\ %{}) do
|
||||
q =
|
||||
from(pr in RegimenInstance,
|
||||
where: pr.regimen_id == ^regimen.local_id and pr.farm_event_id == ^farm_event.local_id
|
||||
)
|
||||
def get_regimen_instance(%FarmEvent{} = farm_event) do
|
||||
regimen = Repo.one!(from r in Regimen, where: r.id == ^farm_event.executable_id)
|
||||
Repo.one(from ri in RegimenInstance, where: ri.regimen_id == ^regimen.local_id and ri.farm_event_id == ^farm_event.local_id)
|
||||
end
|
||||
|
||||
pr = Repo.one(q) || %RegimenInstance{}
|
||||
|
||||
pr
|
||||
|> Repo.preload([:regimen, :farm_event])
|
||||
|> RegimenInstance.changeset(params)
|
||||
def new_regimen_instance!(%FarmEvent{} = farm_event, params \\ %{}) do
|
||||
regimen = Repo.one!(from r in Regimen, where: r.id == ^farm_event.executable_id)
|
||||
RegimenInstance.changeset(%RegimenInstance{}, params)
|
||||
|> Ecto.Changeset.put_assoc(:regimen, regimen)
|
||||
|> Ecto.Changeset.put_assoc(:farm_event, farm_event)
|
||||
|> Repo.insert_or_update!()
|
||||
|> Repo.insert!()
|
||||
end
|
||||
|
||||
def update_regimen_instance!(%RegimenInstance{} = pr, params \\ %{}) do
|
||||
|
|
|
@ -1,220 +1,21 @@
|
|||
defimpl FarmbotCore.AssetWorker, for: FarmbotCore.Asset.FarmEvent do
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias FarmbotCore.{
|
||||
Asset,
|
||||
Asset.FarmEvent,
|
||||
Asset.Regimen,
|
||||
Asset.Sequence
|
||||
alias FarmbotCore.Asset.FarmEvent
|
||||
alias FarmbotCore.FarmEventWorker.{
|
||||
RegimenEvent,
|
||||
SequenceEvent
|
||||
}
|
||||
|
||||
alias FarmbotCeleryScript.{Scheduler, AST, Compiler}
|
||||
defstruct [:farm_event, :datetime, :handle_sequence, :handle_regimen]
|
||||
alias __MODULE__, as: State
|
||||
|
||||
@checkup_time_ms Application.get_env(:farmbot_core, __MODULE__)[:checkup_time_ms]
|
||||
@checkup_time_ms ||
|
||||
Mix.raise("""
|
||||
config :farmbot_core, #{__MODULE__}, checkup_time_ms: 10_000
|
||||
""")
|
||||
|
||||
def preload(%FarmEvent{}), do: []
|
||||
|
||||
def tracks_changes?(%FarmEvent{}), do: false
|
||||
|
||||
def start_link(farm_event, args) do
|
||||
GenServer.start_link(__MODULE__, [farm_event, args])
|
||||
def start_link(%{executable_type: "Regimen"} = farm_event, args) do
|
||||
GenServer.start_link(RegimenEvent, [farm_event, args])
|
||||
end
|
||||
|
||||
def init([farm_event, args]) do
|
||||
# Logger.disable(self())
|
||||
Logger.warn("FarmEvent: #{inspect(farm_event)} is initializing")
|
||||
ensure_executable!(farm_event)
|
||||
now = DateTime.utc_now()
|
||||
handle_sequence = Keyword.get(args, :handle_sequence, &handle_sequence/2)
|
||||
handle_regimen = Keyword.get(args, :handle_regimen, &handle_regimen/3)
|
||||
|
||||
unless is_function(handle_sequence, 2) do
|
||||
raise "FarmEvent Sequence handler should be a 2 arity function"
|
||||
end
|
||||
|
||||
unless is_function(handle_regimen, 3) do
|
||||
raise "FarmEvent Regimen handler should be a 3 arity function"
|
||||
end
|
||||
|
||||
state = %State{
|
||||
farm_event: farm_event,
|
||||
handle_regimen: handle_regimen,
|
||||
handle_sequence: handle_sequence,
|
||||
datetime: farm_event.last_executed || DateTime.utc_now()
|
||||
}
|
||||
|
||||
# check if now is _before_ start_time
|
||||
case DateTime.compare(now, farm_event.start_time) do
|
||||
:lt ->
|
||||
init_event_started(state, now)
|
||||
|
||||
_ ->
|
||||
# check if now is _after_ end_time
|
||||
case DateTime.compare(now, farm_event.end_time) do
|
||||
:gt -> init_event_completed(state, now)
|
||||
_ -> init_event_started(state, now)
|
||||
end
|
||||
end
|
||||
def start_link(%{executable_type: "Sequence"} = farm_event, args) do
|
||||
GenServer.start_link(SequenceEvent, [farm_event, args])
|
||||
end
|
||||
|
||||
defp init_event_completed(%{farm_event: %{executable_type: "Regimen"}} = state, _) do
|
||||
{:ok, state, 0}
|
||||
end
|
||||
|
||||
defp init_event_completed(_, _) do
|
||||
Logger.warn("No future events")
|
||||
:ignore
|
||||
end
|
||||
|
||||
def init_event_started(%State{} = state, _now) do
|
||||
{:ok, state, 0}
|
||||
end
|
||||
|
||||
def handle_info(:timeout, %State{} = state) do
|
||||
next = FarmEvent.build_calendar(state.farm_event, state.datetime)
|
||||
|
||||
if next do
|
||||
# positive if the first date/time comes after the second.
|
||||
diff = DateTime.compare(next, DateTime.utc_now())
|
||||
# if next_event is more than 0 milliseconds away, schedule that event.
|
||||
case diff do
|
||||
:gt ->
|
||||
Logger.info("Event is still in the future")
|
||||
{:noreply, state, @checkup_time_ms}
|
||||
|
||||
diff when diff in [:lt, :eq] ->
|
||||
Logger.info("Event should be executed: #{Timex.from_now(next)}")
|
||||
executable = ensure_executable!(state.farm_event)
|
||||
|
||||
event =
|
||||
ensure_executed!(
|
||||
state.farm_event,
|
||||
executable,
|
||||
next,
|
||||
state.handle_sequence,
|
||||
state.handle_regimen
|
||||
)
|
||||
|
||||
{:noreply, %{state | farm_event: event, datetime: DateTime.utc_now()}, @checkup_time_ms}
|
||||
end
|
||||
else
|
||||
Logger.warn("No more future events to execute.")
|
||||
{:stop, :normal, state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info({Scheduler, _, _}, state) do
|
||||
{:noreply, state, @checkup_time_ms}
|
||||
end
|
||||
|
||||
defp ensure_executed!(
|
||||
%FarmEvent{last_executed: nil} = event,
|
||||
%Sequence{} = exe,
|
||||
next_dt,
|
||||
handle_sequence,
|
||||
_
|
||||
) do
|
||||
# positive if the first date/time comes after the second.
|
||||
comp = Timex.diff(DateTime.utc_now(), next_dt, :minutes)
|
||||
|
||||
cond do
|
||||
# now is more than 2 minutes past expected execution time
|
||||
comp > 2 ->
|
||||
Logger.warn("Sequence: #{inspect(exe)} too late: #{comp} minutes difference.")
|
||||
event
|
||||
|
||||
true ->
|
||||
Logger.warn("Sequence: #{inspect(exe)} has not run before: #{comp} minutes difference.")
|
||||
apply(handle_sequence, [exe, event.body])
|
||||
update_last_executed(event, next_dt)
|
||||
end
|
||||
end
|
||||
|
||||
defp ensure_executed!(%FarmEvent{} = event, %Sequence{} = exe, next_dt, handle_sequence, _) do
|
||||
# positive if the first date/time comes after the second.
|
||||
comp = Timex.diff(event.last_executed, next_dt, :seconds)
|
||||
cond do
|
||||
# last_execution is more than 2 minutes past expected execution time
|
||||
comp < -2000 ->
|
||||
Logger.warn("Sequence: #{exe.id} too late: #{comp} seconds difference.")
|
||||
update_last_executed(event, next_dt)
|
||||
# comp > 0 and comp < 2 ->
|
||||
# Logger.warn("Sequence: #{exe.id} needs executing")
|
||||
# apply(handle_sequence, [exe, event.body])
|
||||
# update_last_executed(event, next_dt)
|
||||
comp < 0 ->
|
||||
Logger.warn("Sequence: #{exe.id} needs executing #{comp} seconds difference.")
|
||||
apply(handle_sequence, [exe, event.body])
|
||||
update_last_executed(event, next_dt)
|
||||
true ->
|
||||
Logger.warn("""
|
||||
what do?:
|
||||
last_executed=#{inspect(event.last_executed)}
|
||||
next_dt=#{inspect(next_dt)}
|
||||
comp=#{comp}
|
||||
""")
|
||||
event
|
||||
end
|
||||
end
|
||||
|
||||
defp ensure_executed!(
|
||||
%FarmEvent{last_executed: nil} = event,
|
||||
%Regimen{} = exe,
|
||||
next_dt,
|
||||
_,
|
||||
handle_regimen
|
||||
) do
|
||||
Logger.warn("Regimen: #{inspect(exe)} has not run before. Executing it.")
|
||||
apply(handle_regimen, [exe, event, %{started_at: next_dt}])
|
||||
Asset.update_farm_event!(event, %{last_executed: next_dt})
|
||||
end
|
||||
|
||||
defp ensure_executed!(%FarmEvent{} = event, %Regimen{} = exe, _next_dt, _, handle_regimen) do
|
||||
apply(handle_regimen, [exe, event, %{}])
|
||||
event
|
||||
end
|
||||
|
||||
defp ensure_executable!(%FarmEvent{executable_type: "Sequence", executable_id: id}) do
|
||||
Asset.get_sequence(id) || raise("Sequence #{id} is not synced")
|
||||
end
|
||||
|
||||
defp ensure_executable!(%FarmEvent{executable_type: "Regimen", executable_id: id}) do
|
||||
Asset.get_regimen(id) || raise("Regimen #{id} is not synced")
|
||||
end
|
||||
|
||||
@doc false
|
||||
def handle_regimen(regimen, event, params) do
|
||||
Asset.upsert_regimen_instance!(regimen, event, params)
|
||||
end
|
||||
|
||||
defp handle_sequence(sequence, farm_event_body) do
|
||||
param_appls = AST.decode(farm_event_body)
|
||||
celery_ast = AST.decode(sequence)
|
||||
|
||||
celery_ast = %{
|
||||
celery_ast
|
||||
| args: %{
|
||||
celery_ast.args
|
||||
| locals: %{celery_ast.args.locals | body: celery_ast.args.locals.body ++ param_appls}
|
||||
}
|
||||
}
|
||||
|
||||
compiled_celery = Compiler.compile(celery_ast)
|
||||
Scheduler.schedule(compiled_celery)
|
||||
end
|
||||
|
||||
defp update_last_executed(event, next_dt) do
|
||||
# TODO(Connor) This causes the event to be restarted.
|
||||
# Similarly to RegimenInstance worker.
|
||||
Asset.update_farm_event!(event, %{last_executed: next_dt})
|
||||
# %{event | last_executed: next_dt}
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
defmodule FarmbotCore.FarmEventWorker.RegimenEvent do
|
||||
@moduledoc """
|
||||
Periodicly checks the current date versus the date that
|
||||
a regimen should be started.
|
||||
"""
|
||||
require Logger
|
||||
use GenServer
|
||||
alias FarmbotCore.Asset
|
||||
|
||||
@impl GenServer
|
||||
def init([event, args]) do
|
||||
send self(), :checkup
|
||||
{:ok, %{event: event, args: args}}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info(:checkup, state) do
|
||||
send self(), {:checkup, DateTime.utc_now()}
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:checkup, now}, state) do
|
||||
start_time = state.event.start_time
|
||||
|
||||
should_be_running? = Map.equal?(
|
||||
Map.take(now, [:year, :month, :day]),
|
||||
Map.take(start_time, [:year, :month, :day])
|
||||
)
|
||||
|
||||
if should_be_running? do
|
||||
Logger.debug "Ensuring RegimenInstance exists for event: #{inspect(state.event)}"
|
||||
send self(), {:ensure_started, now}
|
||||
{:noreply, state}
|
||||
else
|
||||
Process.send_after(self(), :checkup, state.args[:checkup_time_ms] || 15_000)
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info({:ensure_started, now}, state) do
|
||||
if Asset.get_regimen_instance(state.event) do
|
||||
Logger.debug "RegimenInstance already exists for event: #{inspect(state.event)}"
|
||||
{:noreply, state, :hibernate}
|
||||
else
|
||||
Logger.debug "Creating RegimenInstance for event: #{inspect(state.event)}"
|
||||
_regimen_instance = Asset.new_regimen_instance!(state.event, %{started_at: now})
|
||||
{:noreply, state, :hibernate}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,35 @@
|
|||
defmodule FarmbotCore.FarmEventWorker.SequenceEvent do
|
||||
alias FarmbotCeleryScript.AST
|
||||
alias FarmbotCore.{Asset, Asset.FarmEvent}
|
||||
use GenServer
|
||||
|
||||
@impl GenServer
|
||||
def init([event, args]) do
|
||||
send self(), :checkup
|
||||
{:ok, %{event: event, args: args}}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info(:checkup, state) do
|
||||
next_dt = FarmEvent.build_calendar(state.event, DateTime.utc_now())
|
||||
:ok = schedule(state.event, next_dt)
|
||||
{:noreply, state, state.args[:checkup_time_ms] || 15_000}
|
||||
end
|
||||
|
||||
defp schedule(farm_event, at) do
|
||||
sequence = Asset.get_sequence(farm_event.executable_id)
|
||||
sequence || raise("Sequence #{farm_event.executable_id} is not synced")
|
||||
param_appls = AST.decode(farm_event.body)
|
||||
celery_ast = AST.decode(sequence)
|
||||
celery_ast = %{
|
||||
celery_ast
|
||||
| args: %{
|
||||
celery_ast.args
|
||||
| locals: %{celery_ast.args.locals | body: celery_ast.args.locals.body ++ param_appls}
|
||||
}
|
||||
}
|
||||
IO.inspect(celery_ast)
|
||||
IO.inspect(at)
|
||||
# FarmbotCeleryScript.schedule(celery_ast, at)
|
||||
end
|
||||
end
|
|
@ -7,7 +7,7 @@ defmodule Farmbot.TestSupport.AssetFixtures do
|
|||
regimen = regimen(regimen_params)
|
||||
farm_event = regimen_event(regimen, farm_event_params)
|
||||
params = Map.merge(%{id: :rand.uniform(10000), monitor: false}, params)
|
||||
Asset.upsert_regimen_instance!(regimen, farm_event, params)
|
||||
Asset.new_regimen_instance!(farm_event, params)
|
||||
end
|
||||
|
||||
def fbos_config(params \\ %{}) do
|
||||
|
|
Loading…
Reference in New Issue