had to add timezone back...

This commit is contained in:
connor rigby 2017-06-14 15:34:24 -07:00
parent 20122cdfc6
commit b9cb604155
16 changed files with 363 additions and 178 deletions

View file

@ -10,7 +10,7 @@ if Code.ensure_loaded? Farmbot do
Database,
FarmEvent,
ImageWatcher,
RegimenRunner,
Regimen.Runner,
CeleryScript,
DebugLog,
FarmEvent.Runner,

View file

@ -10,7 +10,7 @@ if Code.ensure_loaded? Farmbot do
Database,
FarmEvent,
ImageWatcher,
RegimenRunner,
Regimen.Runner,
CeleryScript,
DebugLog,
FarmEvent.Runner,

View file

@ -116,7 +116,7 @@ defmodule Farmbot.BotState do
"""
@spec update_config(context, String.t, any) :: :ok | {:error, atom}
def update_config(%Context{} = context, config_key, value)
when is_bitstring(config_key) do
when is_binary(config_key) do
GenServer.call(context.configuration, {:update_config, config_key, value})
end

View file

@ -13,6 +13,7 @@ defmodule Farmbot.BotState.Configuration do
model:
[
configuration: %{
timezone: nil,
user_env: %{},
os_auto_update: false,
steps_per_mm_x: 10,
@ -38,6 +39,7 @@ defmodule Farmbot.BotState.Configuration do
@type state ::
%State{
configuration: %{
timezone: nil | binary,
user_env: map,
os_auto_update: boolean,
steps_per_mm_x: integer,
@ -82,9 +84,11 @@ defmodule Farmbot.BotState.Configuration do
{:ok, len_x} = get_config("distance_mm_x")
{:ok, len_y} = get_config("distance_mm_y")
{:ok, len_z} = get_config("distance_mm_z")
{:ok, tz} = get_config("timezone")
new_state =
%State{initial | configuration: %{
user_env: user_env,
user_env: user_env,
timezone: tz,
os_auto_update: os_a_u,
steps_per_mm_x: spm_x,
steps_per_mm_y: spm_y,
@ -168,6 +172,14 @@ defmodule Farmbot.BotState.Configuration do
dispatch true, new_state
end
def handle_call({:update_config, "timezone", tz}, _, state) do
config = state.configuration
new_config = %{config | timezone: tz}
new_state = %{state | configuration: new_config}
put_config("timezone", tz)
dispatch true, new_state
end
def handle_call({:update_config, key, _value}, _from, %State{} = state) do
Logger.error(
">> got an invalid configuration in Configuration tracker: #{inspect key}")

View file

@ -64,13 +64,17 @@ defmodule Farmbot.Database do
:ok
end
defp broadcast_sync(%Context{database: db}, msg),
do: GenServer.cast(db, {:broadcast_sync, msg})
@doc """
Sync up with the API.
"""
# TODO(Connor) this is slow.
@spec sync(Context.t) :: :ok | no_return
def sync(%Context{} = ctx) do
set_syncing(ctx, :syncing)
set_syncing(ctx, :syncing )
broadcast_sync(ctx, :sync_start)
for module_name <- all_syncable_modules() do
if get_awaiting(ctx, module_name) do
:ok = do_sync(ctx, module_name)
@ -80,15 +84,18 @@ defmodule Farmbot.Database do
end
end
set_syncing(ctx, :synced)
set_syncing(ctx, :synced )
broadcast_sync(ctx, :sync_end)
Logger.info ">> is synced!", type: :success
:ok
end
defp do_sync(context, module_name, retries \\ 0)
defp do_sync(%Context{} = _ctx, module_name, retries) when retries > 4 do
defp do_sync(%Context{} = ctx, module_name, retries) when retries > 4 do
debug_log "#{module_name} failed to sync too many times. (#{retries})"
Logger.error ">> failed to sync #{module_name} to many times."
set_syncing(ctx, :sync_error)
broadcast_sync(ctx, :sync_error)
:ok
end
@ -102,6 +109,7 @@ defmodule Farmbot.Database do
rescue
e ->
debug_log "#{module_name} Sync error: #{inspect e}"
IO.warn "#{module_name} HEY LOOK AT ME: #{inspect e}"
do_sync(ctx, module_name, retries + 1)
end
@ -140,6 +148,27 @@ defmodule Farmbot.Database do
"""
def flush(%Context{} = ctx), do: GenServer.call(ctx.database, :flush)
@doc """
Hooks into database events.
will receive events in the form of:
* `{Farmbot.Database, {syncable, `action`, id}}`
Will also receive severl other special messages.
* {Farmbot.Database, `:sync_start`}
* {Farmbot.Database, `:sync_end` }
* {Farmbot.Database, `:sync_error`}
## Action
* `add` - an item was added
* `update` - an item was modified
* `remove` - an item was deleted
* if `action` is remove, `id` may be "*" meaning all syncables were removed.
"""
def hook(%Context{database: db}, pid), do: GenServer.call(db, {:hook, pid})
@doc "Unsubscribes from database events."
def unhook(%Context{database: db}, pid), do: GenServer.call(db, {:unhook, pid})
@doc """
Sets awaiting api resources.
"""
@ -196,6 +225,7 @@ defmodule Farmbot.Database do
:by_kind_and_id,
:awaiting,
:by_kind,
:hooks,
:refs,
:all
]
@ -204,6 +234,7 @@ defmodule Farmbot.Database do
by_kind_and_id: %{ required({DB.syncable, DB.db_id}) => DB.ref_id },
awaiting: %{ required(DB.syncable) => boolean },
by_kind: %{ required(DB.syncable) => [DB.ref_id] },
hooks: [pid | atom],
refs: %{ required(DB.ref_id) => DB.resource_map },
all: [DB.ref_id],
}
@ -223,6 +254,7 @@ defmodule Farmbot.Database do
initial_by_kind_and_id = %{}
initial_awaiting = generate_keys(all_syncable_modules(), true)
initial_by_kind = generate_keys(all_syncable_modules())
initial_hooks = []
initial_refs = %{}
initial_all = []
@ -230,6 +262,7 @@ defmodule Farmbot.Database do
by_kind_and_id: initial_by_kind_and_id,
awaiting: initial_awaiting,
by_kind: initial_by_kind,
hooks: initial_hooks,
refs: initial_refs,
all: initial_all,
}
@ -242,9 +275,25 @@ defmodule Farmbot.Database do
|> Map.new
end
def handle_call(:flush, _, _state) do
{:ok, state} = init([])
{:reply, :ok, state}
def handle_cast({:broadcast_sync, msg}, state) do
for hook <- state.hooks do
broadcast(hook, msg)
end
{:noreply, state}
end
def handle_call(:flush, _, old_state) do
{:ok, new_state} = init([])
{:reply, :ok, %{new_state | hooks: old_state}}
end
def handle_call({:hook, pid}, _, state) do
{:reply, :ok, %{state | hooks: [pid | state.hooks]}}
end
def handle_call({:unhook, pid}, _, state) do
new_hooks = List.delete(state.hooks, pid)
{:reply, :ok, %{state | hooks: new_hooks}}
end
def handle_call({:get_by, kind, id}, _, state) do
@ -272,6 +321,7 @@ defmodule Farmbot.Database do
"*" -> remove_all_syncable(state, syncable)
num -> remove_syncable(state, syncable, num)
end
broadcast_to_hooks(state.hooks, syncable, :remove, int_or_wildcard)
{
:reply,
:ok,
@ -372,6 +422,7 @@ defmodule Farmbot.Database do
# set the ref from the old one.
new = %{already_exists | body: record}
new_refs = %{state.refs | new.ref_id => new}
broadcast_to_hooks(state.hooks, kind, :update, id)
%{state | refs: new_refs}
else
debug_log("inputting new record")
@ -388,6 +439,7 @@ defmodule Farmbot.Database do
new_refs = Map.put(state.refs, rid, new_syncable)
by_kind_and_id = Map.put(state.by_kind_and_id, {kind, id}, rid)
broadcast_to_hooks(state.hooks, kind, :add, id)
%{ state |
refs: new_refs,
all: all,
@ -396,4 +448,14 @@ defmodule Farmbot.Database do
end
end
defp broadcast_to_hooks([], _syncable, _action, _id), do: :ok
defp broadcast_to_hooks([hook | rest], syncable, action, id) do
broadcast(hook, {syncable, action, id})
broadcast_to_hooks(rest, syncable, action, id)
end
defp broadcast(hook, msg) do
send(hook, {__MODULE__, msg})
end
end

View file

@ -88,8 +88,8 @@ defmodule Farmbot.Database.Syncable do
url = "/api" <> plural_url()
result = context |> HTTP.get(url) |> parse_resp(__MODULE__)
if function_exported?(__MODULE__, :on_sync, 2) do
apply __MODULE__, :on_sync, [context, result]
if function_exported?(__MODULE__, :on_fetch, 2) do
apply __MODULE__, :on_fetch, [context, result]
end
case then do
@ -105,8 +105,8 @@ defmodule Farmbot.Database.Syncable do
url = "/api" <> unquote(singular) <> "/#{id}"
result = context |> HTTP.get(url) |> parse_resp(__MODULE__)
if function_exported?(__MODULE__, :on_sync, 2) do
apply __MODULE__, :on_sync, [context, result]
if function_exported?(__MODULE__, :on_fetch, 2) do
apply __MODULE__, :on_fetch, [context, result]
end
case then do

View file

@ -3,9 +3,15 @@ defmodule Farmbot.Database.Syncable.Device do
A Device from the Farmbot API.
"""
alias Farmbot.Database
alias Farmbot.{Context, Database}
alias Database.Syncable
use Syncable, model: [
:name
:name,
:timezone
], endpoint: {"/device", "/device"}
def on_fetch(%Context{} = context, %__MODULE__{timezone: _tz}) do
tz = "America/Los_Angeles"
true = Farmbot.BotState.update_config(context, "timezone", tz)
end
end

View file

@ -14,16 +14,16 @@ defmodule Farmbot.Database.Syncable.Peripheral do
:label
], endpoint: {"/peripherals", "/peripherals"}
def on_sync(context, object_or_list)
def on_fetch(context, object_or_list)
def on_sync(%Context{} = _, []), do: :ok
def on_fetch(%Context{} = _, []), do: :ok
def on_sync(%Context{} = context, [%__MODULE__{} = first | rest]) do
on_sync(context, first)
on_sync(context, rest)
def on_fetch(%Context{} = context, [%__MODULE__{} = first | rest]) do
on_fetch(context, first)
on_fetch(context, rest)
end
def on_sync(%Context{} = context, %__MODULE__{pin: pin, mode: mode, label: label}) do
def on_fetch(%Context{} = context, %__MODULE__{pin: pin, mode: mode, label: label}) do
spawn fn ->
:ok = Farmbot.BotState.set_pin_mode(context, pin, mode)
ast = %Ast{

View file

@ -6,6 +6,7 @@ defmodule Farmbot.Database.Syncable.Regimen do
alias Farmbot.Database
alias Database.Syncable
use Syncable, model: [
:name,
:regimen_items
], endpoint: {"/regimens", "/regimens"}
end

View file

@ -13,7 +13,7 @@ defmodule Farmbot.FarmEvent.Runner do
FarmEvent
}
@checkup_time 10_000
@checkup_time 20_000
@type database :: Database.db
@type state :: {database, %{required(integer) => DateTime.t}}
@ -22,12 +22,38 @@ defmodule Farmbot.FarmEvent.Runner do
GenServer.start_link(__MODULE__, context, opts)
end
def init(context) do
def init(%{database: db} = context) when is_pid(db) do
Process.link(db)
Database.hook(context, self())
send self(), :checkup
{:ok, {context, %{} }}
{:ok, {context, nil, %{} }}
end
def handle_info(:checkup, {context, state}) do
def init(%{database: db} = context) when is_atom(db) do
db_pid = Process.whereis(db) || raise "Could not find Database pid."
init(%{context | database: db_pid})
end
def handle_info({Database, :sync_start}, {context, timer, state}) do
debug_log "Pausing FarmEvent runner until sync finishes."
if timer do
Process.cancel_timer(timer)
end
{:noreply, {context, nil, state}}
end
def handle_info({Database, :sync_end}, {context, timer, state}) do
debug_log "Resuming FarmEvent runner."
if timer do
Process.cancel_timer(timer)
end
new_timer = Process.send_after self(), :checkup, @checkup_time
{:noreply, {context, new_timer, state}}
end
def handle_info({Database, _}, state), do: {:noreply, state}
def handle_info(:checkup, {context, _, state}) do
now = get_now()
# debug_log "Doing checkup: #{inspect now}"
new_state = if now do
@ -45,8 +71,8 @@ defmodule Farmbot.FarmEvent.Runner do
else
state
end
Process.send_after self(), :checkup, @checkup_time
{:noreply, {context, new_state}}
timer = Process.send_after self(), :checkup, @checkup_time
{:noreply, {context, timer, new_state}}
end
@spec start_events(Context.t, [Sequence.t | Regimen.t], DateTime.t)

View file

@ -0,0 +1,180 @@
defmodule Farmbot.Regimen.Runner do
@moduledoc """
Runs a regimen
"""
alias Farmbot.Regimen.Supervisor, as: RegSup
alias Farmbot.{Database, Context}
alias Database.Syncable.{Regimen, Sequence}
alias Farmbot.CeleryScript.Command
use GenServer
require Logger
defmodule Error do
@moduledoc false
defexception [:epoch, :regimen, :message]
end
defmodule Item do
@moduledoc false
@type t :: %__MODULE__{
name: binary,
time_offset: integer,
sequence: Farmbot.CeleryScript.Ast.t
}
defstruct [:time_offset, :sequence, :name]
def parse(%{"time_offset" => offset, "sequence_id" => sequence_id},
%Context{} = ctx)
do
sequence = fetch_sequence(ctx, sequence_id)
%__MODULE__{
name: sequence.name,
time_offset: offset,
sequence: Farmbot.CeleryScript.Ast.parse(sequence)}
end
def fetch_sequence(%Context{} = ctx, id) do
db_obj = Database.get_by_id(ctx, Sequence, id)
unless db_obj do
raise "Could not find sequence by id: #{inspect id}"
end
db_obj.body
end
end
def start_link(%Context{} = ctx, regimen, time) do
GenServer.start_link(__MODULE__,
[ctx, regimen, time],
name: :"regimen-#{regimen.id}")
end
def init([ctx, regimen, time]) do
# parse and sort the regimen items
items = filter_items(regimen, ctx)
first_item = List.first(items)
regimen = %{regimen | regimen_items: items}
epoch = build_epoch(ctx, time) || raise Error,
message: "Could not determine EPOCH because no timezone was supplied.",
epoch: :error, regimen: regimen
initial_state = %{
next_execution: nil,
regimen: regimen,
context: ctx,
epoch: epoch,
timer: nil
}
if first_item do
state = build_next_state(regimen, first_item, ctx, self(), initial_state)
{:ok, state}
else
Logger.warn "[#{regimen.name}] has no items on regimen."
{:ok, :finished}
end
end
def handle_info(:execute, state) do
{item, regimen} = pop_item(state.regimen)
if item do
do_item(item, regimen, state)
else
Logger.info "[#{regimen.name}] is complete!"
spawn fn() ->
RegSup.remove_child(state.context, regimen)
end
{:noreply, :finished}
end
end
def handle_info(:skip, state) do
{item, regimen} = pop_item(state.regimen)
if item do
do_item(nil, regimen, state)
else
Logger.info "[#{regimen.name}] is complete!"
spawn fn() ->
RegSup.remove_child(state.context, regimen)
end
{:noreply, :finished}
end
end
defp filter_items(regimen, %Context{} = ctx) do
regimen.regimen_items
|> Enum.map(&Item.parse(&1, ctx))
|> Enum.sort(&(&1.time_offset <= &2.time_offset))
end
defp do_item(item, regimen, state) do
context =
if item do
Logger.info "[#{regimen.name}] is going to execute: #{item.name}"
Command.do_command(item.sequence, state.context)
else
state.context
end
next_item = List.first(regimen.regimen_items)
if next_item do
new_state = build_next_state(regimen, next_item, context, self(), state)
{:noreply, new_state}
else
Logger.info "[#{regimen.name}] is complete!"
spawn fn() ->
RegSup.remove_child(context, regimen)
end
{:noreply, :finished}
end
end
def build_next_state(
%Regimen{} = regimen, %Item{} = next_item, %Context{} = context, pid, state)
do
next_dt = Timex.shift(state.epoch, milliseconds: next_item.time_offset)
now = Timex.now(Farmbot.BotState.get_config(context, :timezone))
offset_from_now = Timex.diff(next_dt, now, :milliseconds)
# if the offset from now is negative, we may want to handle this differently?
timer = if (offset_from_now < 0) and (offset_from_now < -60000) do
Logger.info "[#{regimen.name}] #{[next_item.name]} has been scheduled " <>
"to happen more than one minute ago: #{offset_from_now} Skipping it."
Process.send_after(pid, :skip, 1000)
else
{msg, real_offset} = ensure_not_negative(offset_from_now)
Process.send_after(pid, msg, real_offset)
end
timestr = "#{next_dt.month}/#{next_dt.day}/#{next_dt.year} " <>
"at: #{next_dt.hour}:#{next_dt.minute} (#{offset_from_now} milliseconds)"
Logger.info "[#{regimen.name}] next item will execute on #{timestr}"
%{state | timer: timer,
context: context,
regimen: regimen,
next_execution: next_dt}
end
defp ensure_not_negative(offset) when offset < -60_000, do: {:skip, 1000}
defp ensure_not_negative(offset) when offset < 0, do: {:execute, 1000}
defp ensure_not_negative(offset), do: {:execute, offset}
@spec pop_item(Regimen.t) :: {Item.t | nil, Regimen.t}
# when there is more than one item pop the top one
defp pop_item(%Regimen{regimen_items: [do_this_one | items ]} = r) do
{do_this_one, %Regimen{r | regimen_items: items}}
end
# returns midnight of today
@spec build_epoch(Context.t, DateTime.t) :: DateTime.t
def build_epoch(%Context{} = context, time) do
tz = Farmbot.BotState.get_config(context, :timezone)
if tz do
n = Timex.Timezone.convert(time, tz)
Timex.shift(n, hours: -n.hour, seconds: -n.second, minutes: -n.minute)
end
end
end

View file

@ -25,7 +25,7 @@ defmodule Farmbot.Regimen.Supervisor do
"""
def add_child(%Context{} = context, regimen, time) do
Supervisor.start_child(context.regimen_supervisor,
worker(Farmbot.RegimenRunner, [context, regimen, time],
worker(Farmbot.Regimen.Runner, [context, regimen, time],
[restart: :permanent, id: regimen.id]))
end

View file

@ -1,114 +0,0 @@
defmodule Farmbot.RegimenRunner do
@moduledoc """
Runs a regimen
"""
use GenServer
alias Farmbot.Regimen.Supervisor, as: RegSup
require Logger
alias Farmbot.Database.Syncable.Regimen
alias Farmbot.CeleryScript.Command
alias Farmbot.Context
defmodule Item do
@moduledoc false
@type t :: %__MODULE__{time_offset: integer,
sequence: Farmbot.CeleryScript.Ast.t}
defstruct [:time_offset, :sequence]
def parse(%{"time_offset" => offset, "sequence" => sequence}) do
%__MODULE__{time_offset: offset,
sequence: Farmbot.CeleryScript.Ast.parse(sequence)}
end
end
def start_link(%Context{} = ctx, regimen, time) do
GenServer.start_link(__MODULE__,
[ctx, regimen, time],
name: :"regimen-#{regimen.id}")
end
def init([ctx, regimen, time]) do
# parse and sort the regimen items
items = filter_items(regimen)
first_item = List.first(items)
if first_item do
epoch = build_epoch(time)
first_dt = Timex.shift(epoch, milliseconds: first_item.time_offset)
timestr = "#{first_dt.month}/#{first_dt.day}/#{first_dt.year} " <>
"at: #{first_dt.hour}:#{first_dt.minute}"
Logger.info "your fist item will execute on #{timestr}"
millisecond_offset = Timex.diff(first_dt, Timex.now(), :milliseconds)
Process.send_after(self(), :execute, millisecond_offset)
next = %{
epoch: epoch,
regimen: %{regimen | regimen_items: items},
next_execution: first_dt,
context: ctx
}
{:ok, next}
else
Logger.warn ">> no items on regimen: #{regimen.name}"
{:ok, %{context: ctx}}
end
end
def handle_call(:get_state, _from, state), do: {:reply, state, state}
def handle_info(:execute, state) do
{item, regimen} = pop_item(state.regimen)
if item do
do_item(item, regimen, state)
else
Logger.info ">> #{regimen.name} is complete!"
spawn fn() ->
RegSup.remove_child(state.context, regimen)
end
{:noreply, :finished}
end
end
defp filter_items(regimen) do
regimen.regimen_items
|> Enum.map(&Item.parse(&1))
|> Enum.sort(&(&1.time_offset <= &2.time_offset))
end
defp do_item(item, regimen, state) do
Command.do_command(item, state.context)
next_item = List.first(regimen.regimen_items)
if next_item do
next_dt = Timex.shift(state.epoch, milliseconds: next_item.time_offset)
timestr = "#{next_dt.month}/#{next_dt.day}/#{next_dt.year} at: #{next_dt.hour}:#{next_dt.minute}"
Logger.info "your next item will execute on #{timestr}"
millisecond_offset = Timex.diff(next_dt, Timex.now(), :milliseconds)
Process.send_after(self(), :execute, millisecond_offset)
{:ok, %{state | regimen: regimen, next_execution: next_dt}}
else
Logger.info ">> #{regimen.name} is complete!"
spawn fn() ->
RegSup.remove_child(state.context, regimen)
end
{:noreply, :finished}
end
end
@spec pop_item(Regimen.t) :: {Item.t | nil, Regimen.t}
# when there is more than one item pop the top one
defp pop_item(%Regimen{regimen_items: [do_this_one | items ]} = r) do
{do_this_one, %Regimen{r | regimen_items: items}}
end
@doc """
Gets the state of a regimen by its id.
"""
def get_state(id), do: GenServer.call(:"regimen-#{id}", :get_state)
# returns midnight of today
@spec build_epoch(DateTime.t) :: DateTime.t
def build_epoch(n) do
Timex.shift(n, hours: -n.hour, seconds: -n.second, minutes: -n.minute)
end
end

View file

@ -4,6 +4,7 @@
"server": "http://localhost:3000"
},
"configuration": {
"timezone": null,
"user_env": {},
"os_auto_update": false,
"first_party_farmware": false,

View file

@ -1,37 +1,38 @@
{
"network": {
"interfaces": {
"wlan0": {
"type": "wireless",
"default": "hostapd",
"settings": {
"ipv4_address": "192.168.24.1"
}
},
"eth0": {
"type": "wired",
"default": false
}
},
"ntp": true,
"ssh": false
},
"authorization": {
"server": "https://my.farmbot.io"
},
"configuration": {
"user_env": {},
"os_auto_update": false,
"first_party_farmware": true,
"steps_per_mm_x": 5,
"steps_per_mm_y": 5,
"steps_per_mm_z": 25,
"distance_mm_x": 1500,
"distance_mm_y": 3000,
"distance_mm_z": 800
},
"hardware": {
"params": {},
"custom_firmware": false
"network": {
"interfaces": {
"wlan0": {
"type": "wireless",
"default": "hostapd",
"settings": {
"ipv4_address": "192.168.24.1"
}
}
},
"eth0": {
"type": "wired",
"default": false
}
},
"ntp": true,
"ssh": false
},
"authorization": {
"server": "https://my.farmbot.io"
},
"configuration": {
"timezone": null,
"user_env": {},
"os_auto_update": false,
"first_party_farmware": true,
"steps_per_mm_x": 5,
"steps_per_mm_y": 5,
"steps_per_mm_z": 25,
"distance_mm_x": 1500,
"distance_mm_y": 3000,
"distance_mm_z": 800
},
"hardware": {
"params": {},
"custom_firmware": false
}
}

View file

@ -0,0 +1,10 @@
defmodule ReEnableTZ do
def run(json) do
config = json["configuration"]
if config["timezone"] do
json
else
%{json | "configuration" => Map.put(config, "timezone", nil) }
end
end
end