bring back farmevents and regimens

This commit is contained in:
connor rigby 2017-11-10 12:10:02 -08:00
parent 3db2a97271
commit b0658efbc2
19 changed files with 522 additions and 25 deletions

View file

@ -30,6 +30,7 @@ config :farmbot, ecto_repos: repos
for repo <- [Farmbot.Repo.A, Farmbot.Repo.B] do
config :farmbot, repo,
adapter: Sqlite.Ecto2,
loggers: [],
database: "tmp/#{repo}_dev.sqlite3"
end

View file

@ -121,13 +121,15 @@ defmodule Farmbot.Bootstrap.Supervisor do
ConfigStorage.update_config_value(:string, "authorization", "last_shutdown_reason", nil)
children = [
worker(Farmbot.Bootstrap.AuthTask, []),
supervisor(Farmbot.Firmware.Supervisor, []),
supervisor(Farmbot.BotState.Supervisor, []),
worker(Farmbot.Bootstrap.AuthTask, []),
supervisor(Farmbot.Firmware.Supervisor, []),
supervisor(Farmbot.BotState.Supervisor, []),
supervisor(Farmbot.BotState.Transport.Supervisor, []),
supervisor(Farmbot.HTTP.Supervisor, []),
supervisor(Farmbot.Repo.Supervisor, []),
supervisor(Farmbot.Farmware.Supervisor, [])
supervisor(Farmbot.HTTP.Supervisor, []),
supervisor(Farmbot.Repo.Supervisor, []),
supervisor(Farmbot.Farmware.Supervisor, []),
supervisor(Farmbot.Regimen.Supervisor, []),
supervisor(Farmbot.FarmEvent.Supervisor, [])
]
opts = [strategy: :one_for_one]

View file

@ -88,17 +88,19 @@ defmodule Farmbot.BotState.Transport.GenMQTT.Client do
end
def on_publish(["bot", _, "sync", kind, id], msg, state) do
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
body = struct(mod)
sync_cmd = msg |> Poison.decode!(as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
Farmbot.Repo.register_sync_cmd(sync_cmd)
spawn fn() ->
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
body = struct(mod)
sync_cmd = msg |> Poison.decode!(as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
Farmbot.Repo.register_sync_cmd(sync_cmd)
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
Farmbot.Repo.flip()
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
Farmbot.Repo.flip()
end
else
Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(msg)}"
end
else
Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(msg)}"
end
{:ok, state}
end

View file

@ -0,0 +1,17 @@
defprotocol Farmbot.FarmEvent.Execution do
@moduledoc """
Protocol to be implemented by any struct that can be executed by
Farmbot.FarmEvent.Manager.
"""
@typedoc "Data to be executed."
@type data :: map
@doc """
Execute an item.
* `data` - A `Farmbot.Database.Syncable` struct that is implemented.
* `now` - `DateTime` of execution.
"""
@spec execute_event(data, DateTime.t) :: any
def execute_event(data, now)
end

View file

@ -0,0 +1,10 @@
defimpl Farmbot.FarmEvent.Execution, for: Farmbot.Repo.Regimen do
def execute_event(regimen, now) do
case Process.whereis(:"regimen-#{regimen.id}") do
nil -> {:ok, _pid} = Farmbot.Regimen.Supervisor.add_child(regimen, now)
pid -> {:ok, pid}
end
end
end

View file

@ -0,0 +1,15 @@
defimpl Farmbot.FarmEvent.Execution, for: Farmbot.Repo.Sequence do
def execute_event(sequence, _now) do
with {:ok, ast} <- Farmbot.CeleryScript.AST.decode(sequence) do
case Farmbot.CeleryScript.execute(ast) do
{:ok, _} -> :ok
{:error, reason, _} ->
{:error, reason}
end
else
{:error, reason} -> {:error, reason}
end
end
end

View file

@ -0,0 +1,223 @@
defmodule Farmbot.FarmEvent.Manager do
@moduledoc """
Manages execution of FarmEvents.
## Rules for FarmEvent execution.
* Regimen
* ignore `end_time`.
* ignore calendar.
* if start_time is more than 60 seconds passed due, assume it already started, and don't start it again.
* Sequence
* if `start_time` is late, check the calendar.
* for each item in the calendar, check if it's event is more than 60 seconds in the past. if not, execute it.
* if there is only one event in the calendar, ignore the `end_time`
"""
use GenServer
use Farmbot.Logger
alias Farmbot.FarmEvent.Execution
alias Farmbot.Repo.FarmEvent
@checkup_time 20_000
## GenServer
defmodule State do
@moduledoc false
defstruct [timer: nil, last_time_index: %{}]
end
@doc false
def start_link do
GenServer.start_link(__MODULE__, [], [name: __MODULE__])
end
def init([]) do
send self(), :checkup
{:ok, struct(State)}
end
def handle_info(:checkup, state) do
now = get_now()
all_events = Farmbot.Repo.current_repo().all(Farmbot.Repo.FarmEvent)
# do checkup is the bulk of the work.
{late_events, new} = do_checkup(all_events, now, state)
#TODO(Connor) Conditionally start events based on some state info.
unless Enum.empty?(late_events) do
Logger.info 3, "Time for event to run at: #{now.hour}:#{now.minute}"
start_events(late_events, now)
end
# Start a new timer.
timer = Process.send_after self(), :checkup, @checkup_time
{:noreply, %{new | timer: timer}}
end
defp do_checkup(list, time, late_events \\ [], state)
defp do_checkup([], _now, late_events, state), do: {late_events, state}
defp do_checkup([farm_event | rest], now, late_events, state) do
# new_late will be a executable event (Regimen or Sequence.)
{new_late_event, last_time} = check_event(farm_event, now, state.last_time_index[farm_event.id])
# update state.
new_state = %{state | last_time_index: Map.put(state.last_time_index, farm_event.id, last_time)}
case new_late_event do
# if `new_late_event` is nil, don't accumulate it.
nil -> do_checkup(rest, now, late_events, new_state)
# if there is a new event, accumulate it.
event -> do_checkup(rest, now, [event | late_events], new_state)
end
end
defp check_event(%FarmEvent{} = f, now, last_time) do
# Get the executable out of the database this may fail.
# mod_list = ["Farmbot", "Repo", f.executable_type]
mod = Module.safe_concat([f.executable_type])
event = lookup(mod, f.executable_id)
# build a local start time and end time
start_time = Timex.parse! f.start_time, "{ISO:Extended}"
end_time = Timex.parse! f.end_time, "{ISO:Extended}"
# start_time = f.start_time
# end_time = f.end_time
# get local bool of if the event is started and finished.
started? = Timex.after? now, start_time
finished? = Timex.after? now, end_time
case f.executable_type do
"Elixir.Farmbot.Repo.Regimen" -> maybe_start_regimen(started?, start_time, last_time, event, now)
"Elixir.Farmbot.Repo.Sequence" -> maybe_start_sequence(started?, finished?, f, last_time, event, now)
end
end
defp maybe_start_regimen(started?, start_time, last_time, event, now)
defp maybe_start_regimen(_started? = true, start_time, last_time, event, now) do
case is_too_old?(now, start_time) do
true ->
Logger.debug 3, "regimen #{event.name} (#{event.id}) is too old to start."
{nil, last_time}
false ->
Logger.debug 3, "regimen #{event.name} (#{event.id}) not to old; starting."
{event, now}
end
end
defp maybe_start_regimen(_started? = false, start_time, last_time, event, _) do
Logger.debug 3, "regimen #{event.name} (#{event.id}) is not started yet. (#{inspect start_time}) (#{inspect Timex.now()})"
{nil, last_time}
end
defp lookup(module, sr_id) do
case Farmbot.Repo.current_repo().get(module, sr_id) do
nil -> raise "Could not find #{module} by id: #{sr_id}"
item -> item
end
end
# signals the start of a sequence based on the described logic.
defp maybe_start_sequence(started?, finished?, farm_event, last_time, event, now)
# We only want to check if the sequence is started, and not finished.
defp maybe_start_sequence(_started? = true, _finished? = false, farm_event, last_time, event, now) do
{run?, next_time} = should_run_sequence?(farm_event.calendar, last_time, now)
case run? do
true -> {event, next_time}
false -> {nil, last_time}
end
end
# if `farm_event.time_unit` is "never" we can't use the `end_time`.
# if we have no `last_time`, time to execute.
defp maybe_start_sequence(true, _, %{time_unit: "never"} = f, _last_time = nil, event, now) do
Logger.debug 3, "Ignoring end_time."
case should_run_sequence?(f.calendar, nil, now) do
{true, next} -> {event, next}
{false, _} -> {nil, nil }
end
end
# if started is false, the event isn't ready to be executed.
defp maybe_start_sequence(_started? = false, _fin, _farm_event, last_time, event, _now) do
Logger.debug 3, "sequence #{event.name} (#{event.id}) is not started yet."
{nil, last_time}
end
# if the event is finished (but not a "never" time_unit), we don't execute.
defp maybe_start_sequence(_started?, _finished? = true, _farm_event, last_time, event, _now) do
Logger.success 3, "sequence #{event.name} (#{event.id}) is finished."
{nil, last_time}
end
# Checks if we shoudl run a sequence or not. returns {event | nil, time | nil}
defp should_run_sequence?(calendar, last_time, now)
# if there is no last time, check if time is passed now within 60 seconds.
defp should_run_sequence?([first_time | _], nil, now) do;
Logger.debug 3, "Checking sequence event that hasn't run before #{first_time}"
# convert the first_time to a DateTime
dt = Timex.parse! first_time, "{ISO:Extended}"
# if now is after the time, we are in fact late
if Timex.after?(now, dt) do
{true, now}
else
# make sure to return nil as the last time because it stil hasnt executed yet.
Logger.debug 3, "Sequence Event not ready yet."
{false, nil}
end
end
defp should_run_sequence?(calendar, last_time, now) do
# get rid of all the items that happened before last_time
filtered_calendar = Enum.filter(calendar, fn(iso_time) ->
dt = Timex.parse! iso_time, "{ISO:Extended}"
# we only want this time if it happened after the last_time
Timex.after?(dt, last_time)
end)
# if after filtering, there are events that need to be run
# check if they are older than a minute ago,
case filtered_calendar do
[iso_time | _] ->
dt = Timex.parse! iso_time, "{ISO:Extended}"
if Timex.after?(now, dt) do
{true, dt}
# too_old? = is_too_old?(now, dt)
# if too_old?, do: {false, last_time}, else: {true, dt}
else
Logger.debug 3, "Sequence Event not ready yet."
{false, dt}
end
[] ->
Logger.debug 3, "No items in calendar."
{false, last_time}
end
end
# Enumeration is complete.
defp start_events([], _now), do: :ok
# Enumerate the events to be started.
defp start_events([event | rest], now) do
# Spawn to be non blocking here. Maybe link to this process?
spawn fn() -> Execution.execute_event(event, now) end
# Continue enumeration.
start_events(rest, now)
end
# is then more than 1 minute in the past?
defp is_too_old?(now, then) do
time_str_fun = fn(dt) -> "#{dt.hour}:#{dt.minute}:#{dt.second}" end
seconds = DateTime.to_unix(now, :second) - DateTime.to_unix(then, :second)
c = seconds > 60 # not in MS here
Logger.debug 3, "is checking #{time_str_fun.(now)} - #{time_str_fun.(then)} = #{seconds} seconds ago. is_too_old? => #{c}"
c
end
defp get_now(), do: Timex.now()
end

View file

@ -0,0 +1,16 @@
defmodule Farmbot.FarmEvent.Supervisor do
@moduledoc false
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], [name: __MODULE__])
end
def init([]) do
children = [
worker(Farmbot.FarmEvent.Manager, [])
]
supervise(children, strategy: :one_for_one)
end
end

View file

@ -140,7 +140,7 @@ defmodule Farmbot.Firmware do
end
defp do_begin_cmd(%Current{fun: fun, args: args, from: _from} = current, state, dispatch) do
Logger.debug 3, "Firmware command: #{fun}#{inspect(args)}"
# Logger.debug 3, "Firmware command: #{fun}#{inspect(args)}"
if fun == :emergency_unlock, do: Farmbot.BotState.set_sync_status(:sync_now)
case apply(state.handler_mod, fun, [state.handler | args]) do

View file

@ -158,7 +158,7 @@ defmodule Farmbot.Firmware.UartHandler do
end
defp do_write(bin, state, dispatch \\ []) do
Logger.debug 3, "writing: #{bin}"
# Logger.debug 3, "writing: #{bin}"
case UART.write(state.nerves, bin) do
:ok -> {:reply, :ok, dispatch, %{state | current_cmd: bin}}
err -> {:reply, err, [], %{state | current_cmd: nil}}

View file

@ -0,0 +1,164 @@
defmodule Farmbot.Regimen.Manager do
@moduledoc "Manages a Regimen"
use Farmbot.Logger
use GenServer
alias Farmbot.Repo.Regimen
defmodule Error do
@moduledoc false
defexception [:epoch, :regimen, :message]
end
defmodule Item do
@moduledoc false
@type t :: %__MODULE__{
name: binary,
time_offset: integer,
sequence: Farmbot.CeleryScript.Ast.t
}
defstruct [:time_offset, :sequence, :name]
def parse(%{time_offset: offset, sequence_id: sequence_id})
do
sequence = fetch_sequence(sequence_id)
%__MODULE__{
name: sequence.name,
time_offset: offset,
sequence: Farmbot.CeleryScript.AST.decode(sequence) |> elem(1)}
end
def fetch_sequence(id) do
case Farmbot.Repo.current_repo().get(Farmbot.Repo.Sequence, id) do
nil -> raise "Could not find sequence by id: #{inspect id}"
obj -> obj
end
end
end
@doc false
def start_link(regimen, time) do
GenServer.start_link(__MODULE__, [regimen, time], name: :"regimen-#{regimen.id}")
end
def init([regimen, time]) do
# parse and sort the regimen items
items = filter_items(regimen)
first_item = List.first(items)
regimen = %{regimen | regimen_items: items}
epoch = build_epoch(time) || raise Error,
message: "Could not determine EPOCH because no timezone was supplied.",
epoch: :error, regimen: regimen
initial_state = %{
next_execution: nil,
regimen: regimen,
epoch: epoch,
timer: nil
}
if first_item do
state = build_next_state(regimen, first_item, self(), initial_state)
{:ok, state}
else
Logger.warn 2, "[#{regimen.name}] has no items on regimen."
:ignore
end
end
def handle_info(:execute, state) do
{item, regimen} = pop_item(state.regimen)
if item do
do_item(item, regimen, state)
else
complete(regimen, state)
end
end
def handle_info(:skip, state) do
{item, regimen} = pop_item(state.regimen)
if item do
do_item(nil, regimen, state)
else
complete(regimen, state)
end
end
defp complete(regimen, state) do
Logger.success 2, "[#{regimen.name}] is complete!"
# spawn fn() ->
# RegSup.remove_child(regimen)
# end
{:stop, :normal, state}
# {:noreply, :finished}
end
defp filter_items(regimen) do
regimen.regimen_items
|> Enum.map(&Item.parse(&1))
|> Enum.sort(&(&1.time_offset <= &2.time_offset))
end
defp do_item(item, regimen, state) do
if item do
Logger.busy 2, "[#{regimen.name}] is going to execute: #{item.name}"
Farmbot.CeleryScript.execute(item.sequence)
end
next_item = List.first(regimen.regimen_items)
if next_item do
new_state = build_next_state(regimen, next_item, self(), state)
{:noreply, new_state}
else
complete(regimen, state)
end
end
def build_next_state(
%Regimen{} = regimen,
%Item{} = nx_itm,
pid, state)
do
next_dt = Timex.shift(state.epoch, milliseconds: nx_itm.time_offset)
timezone = Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "timezone")
now = Timex.now(timezone)
offset_from_now = Timex.diff(next_dt, now, :milliseconds)
timer = if (offset_from_now < 0) and (offset_from_now < -60_000) do
Logger.info 3, "[#{regimen.name}] #{[nx_itm.name]} has been scheduled " <>
"to happen more than one minute ago: #{offset_from_now} Skipping it."
Process.send_after(pid, :skip, 1000)
else
{msg, real_offset} = ensure_not_negative(offset_from_now)
Process.send_after(pid, msg, real_offset)
end
timestr = "#{next_dt.month}/#{next_dt.day}/#{next_dt.year} " <>
"at: #{next_dt.hour}:#{next_dt.minute} (#{offset_from_now} milliseconds)"
Logger.info 3, "[#{regimen.name}] next item will execute on #{timestr}"
%{state | timer: timer,
regimen: regimen,
next_execution: next_dt}
end
defp ensure_not_negative(offset) when offset < -60_000, do: {:skip, 1000}
defp ensure_not_negative(offset) when offset < 0, do: {:execute, 1000}
defp ensure_not_negative(offset), do: {:execute, offset}
@spec pop_item(Regimen.t) :: {Item.t | nil, Regimen.t}
# when there is more than one item pop the top one
defp pop_item(%Regimen{regimen_items: [do_this_one | items ]} = r) do
{do_this_one, %Regimen{r | regimen_items: items}}
end
# returns midnight of today
@spec build_epoch(DateTime.t) :: DateTime.t
def build_epoch(time) do
tz = Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "timezone")
n = Timex.Timezone.convert(time, tz)
Timex.shift(n, hours: -n.hour, seconds: -n.second, minutes: -n.minute)
end
end

View file

@ -0,0 +1,25 @@
defmodule Farmbot.Regimen.Supervisor do
@moduledoc false
use Supervisor
@doc false
def start_link do
Supervisor.start_link(__MODULE__, [], [name: __MODULE__])
end
def init([]) do
children = []
opts = [strategy: :one_for_one]
supervise(children, opts)
end
def add_child(regimen, time) do
spec = worker(Farmbot.Regimen.Manager, [regimen, time], [restart: :transient, id: regimen.id])
Supervisor.start_child(__MODULE__, spec)
end
def remove_child(regimen) do
Supervisor.terminate_child(__MODULE__, regimen.id)
Supervisor.delete_child(__MODULE__, regimen.id)
end
end

View file

@ -7,6 +7,7 @@ defmodule Farmbot.Repo.Device do
schema "devices" do
field(:name, :string)
field(:timezone, :string)
end
use Farmbot.Repo.Syncable

View file

@ -7,16 +7,19 @@ defmodule Farmbot.Repo.FarmEvent do
* A Sequence will execute.
"""
alias Farmbot.Repo.JSONType
use Ecto.Schema
import Ecto.Changeset
schema "farm_events" do
field(:start_time, :utc_datetime)
field(:end_time, :utc_datetime)
field(:start_time, :string)
field(:end_time, :string)
field(:repeat, :integer)
field(:time_unit, :string)
field(:executable_type, Farmbot.Repo.ModuleType.FarmEvent)
field(:executable_id, :integer)
field(:calendar, JSONType)
end
use Farmbot.Repo.Syncable
@ -33,7 +36,6 @@ defmodule Farmbot.Repo.FarmEvent do
def changeset(farm_event, params \\ %{}) do
farm_event
|> ensure_time([:start_time, :end_time])
|> cast(params, @required_fields)
|> validate_required(@required_fields)
|> unique_constraint(:id)

View file

@ -3,15 +3,18 @@ defmodule Farmbot.Repo.Regimen do
A Regimen is a schedule to run sequences on.
"""
alias Farmbot.Repo.JSONType
use Ecto.Schema
import Ecto.Changeset
schema "regimens" do
field(:name, :string)
field(:regimen_items, JSONType)
end
use Farmbot.Repo.Syncable
@required_fields [:id, :name]
@required_fields [:id, :name, :regimen_items]
def changeset(farm_event, params \\ %{}) do
farm_event

View file

@ -33,7 +33,7 @@ defmodule Farmbot.Repo do
@doc "Register a diff to be stored until a flip."
def register_sync_cmd(sync_cmd) do
GenServer.call(__MODULE__, {:register_sync_cmd, sync_cmd})
GenServer.call(__MODULE__, {:register_sync_cmd, sync_cmd}, :infinity)
end
@doc false
@ -52,9 +52,21 @@ defmodule Farmbot.Repo do
"A" -> [repo_a, repo_b]
"B" -> [repo_b, repo_a]
end
# Copy configs
[current, _] = repos
copy_configs(current)
{:ok, %{repos: repos, sync_cmds: []}}
end
defp copy_configs(repo) do
case repo.one(Farmbot.Repo.Device) do
nil -> :ok
%{timezone: tz} ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "timezone", tz)
:ok
end
end
def terminate(_,_) do
Farmbot.BotState.set_sync_status(:sync_error)
end
@ -80,7 +92,7 @@ defmodule Farmbot.Repo do
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "A")
end
Farmbot.BotState.set_sync_status(:synced)
copy_configs(repo_b)
{:reply, repo_b, %{state | repos: [repo_b, repo_a]}}
end
@ -183,6 +195,7 @@ defmodule Farmbot.Repo do
end
defp sync_resource(repo, resource, slug) do
Logger.debug 3, "syncing: #{resource} (#{slug})"
as = if resource in @singular_resources do
struct(resource)
else

View file

@ -4,12 +4,13 @@ defmodule Farmbot.Repo.Migrations.AddFarmEventsTable do
def change do
create table("farm_events", primary_key: false) do
add(:id, :integer)
add(:start_time, :utc_datetime)
add(:end_time, :utc_datetime)
add(:start_time, :string)
add(:end_time, :string)
add(:repeat, :integer)
add(:time_unit, :string)
add(:executable_type, :string)
add(:executable_id, :integer)
add(:calendar, :string)
end
create(unique_index("farm_events", [:id]))

View file

@ -5,6 +5,7 @@ defmodule Farmbot.Repo.Migrations.AddRegimensTable do
create table("regimens", primary_key: false) do
add(:id, :integer)
add(:name, :string)
add(:regimen_items, :string)
end
create(unique_index("regimens", [:id]))

View file

@ -5,6 +5,7 @@ defmodule Farmbot.Repo.A.Migrations.AddDevicesTable do
create table("devices", primary_key: false) do
add(:id, :integer)
add(:name, :string)
add(:timezone, :string)
end
create(unique_index("devices", [:id]))