💥 autosyncing!!!!

This commit is contained in:
Connor Rigby 2017-11-07 09:52:54 -08:00
parent 412c7da4b9
commit d79d02cd54
7 changed files with 130 additions and 11 deletions

BIN
dhcp_server.o Normal file

Binary file not shown.

View file

@ -27,6 +27,11 @@ defmodule Farmbot.BotState do
user_env: %{},
process_info: %{}
@valid_sync_status [ :locked, :maintenance, :sync_error, :sync_now, :synced, :syncing, :unknown,]
def set_sync_status(cmd) when cmd in @valid_sync_status do
GenStage.call(__MODULE__, {:set_sync_status, cmd})
end
@doc "Forces a state push over all transports."
def force_state_push do
GenStage.call(__MODULE__, :force_state_push)
@ -68,6 +73,12 @@ defmodule Farmbot.BotState do
{:reply, :ok, [{:emit, ast}], state}
end
def handle_call({:set_sync_status, status}, _, state) do
new_info_settings = %{state.informational_settings | sync_status: status}
new_state = %{state | informational_settings: new_info_settings}
{:reply, :ok, [new_state], new_state}
end
defp do_handle([], state), do: state
defp do_handle([{:config, "settings", key, val} | rest], state) do

View file

@ -77,12 +77,20 @@ defmodule Farmbot.BotState.Transport.GenMQTT.Client do
{:ok, state}
end
def on_publish(["bot", _, "sync", "Log", _], _, state) do
{:ok, state}
end
def on_publish(["bot", _, "sync", kind, id], msg, state) do
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
body = struct(mod)
sync_cmd = msg |> Poison.decode!(as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
Farmbot.Repo.register_sync_cmd(sync_cmd)
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
Farmbot.Repo.flip()
end
else
Logger.warn "Unknown syncable: #{mod}: #{inspect Poison.decode!(msg)}"
end

View file

@ -5,7 +5,9 @@ defmodule Farmbot.CeleryScript.AST.Node.Sync do
def execute(_, _, env) do
env = mutate_env(env)
Logger.warn "SYNC BROKE"
Farmbot.Repo.flip()
{:ok, env}
end
end

View file

@ -4,6 +4,18 @@ defmodule Farmbot.Repo do
use GenServer
require Logger
alias Farmbot.Repo.{
Device,
FarmEvent,
Peripheral,
Point,
Regimen,
Sequence,
Tool
}
@singular_resources [Device]
@doc "Fetch the current repo."
def current_repo do
GenServer.call(__MODULE__, :current_repo)
@ -29,10 +41,24 @@ defmodule Farmbot.Repo do
GenServer.start_link(__MODULE__, repos, [name: __MODULE__])
end
def init([_repo_a, _repo_b] = repos) do
def init([repo_a, repo_b]) do
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "first_sync") do
do_sync_all_resources(repo_a)
do_sync_all_resources(repo_b)
Farmbot.System.ConfigStorage.update_config_value(:bool, "settings", "first_sync", false)
end
repos = case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" -> [repo_a, repo_b]
"B" -> [repo_b, repo_a]
end
{:ok, %{repos: repos, sync_cmds: []}}
end
def terminate(_,_) do
Farmbot.BotState.set_sync_status(:sync_error)
end
def handle_call(:current_repo, _, %{repos: [repo_a, _]} = state) do
{:reply, repo_a, state}
end
@ -42,13 +68,24 @@ defmodule Farmbot.Repo do
end
def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do
Farmbot.BotState.set_sync_status(:syncing)
Enum.reverse(state.sync_cmds) |> Enum.map(fn(sync_cmd) ->
apply_sync_cmd(repo_a, sync_cmd)
end)
case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "B")
"B" ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "A")
end
Farmbot.BotState.set_sync_status(:synced)
{:reply, repo_b, %{state | repos: [repo_b, repo_a]}}
end
def handle_call({:register_sync_cmd, sync_cmd}, _from, state) do
Farmbot.BotState.set_sync_status(:sync_now)
[_current_repo, other_repo] = state.repos
apply_sync_cmd(other_repo, sync_cmd)
sync_cmds = if sync_cmd in state.sync_cmds do
@ -64,6 +101,7 @@ defmodule Farmbot.Repo do
do_apply_sync_cmd(repo, sync_cmd)
rescue
e in Ecto.InvalidChangesetError ->
Farmbot.BotState.set_sync_status(:sync_error)
Logger.error "Failed to apply sync_cmd: (#{repo}) #{inspect sync_cmd} (#{e.action})"
fix_repo(repo, sync_cmd)
end
@ -72,7 +110,7 @@ defmodule Farmbot.Repo do
defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: nil} = sync_cmd) do
# an object was deleted.
if Code.ensure_loaded?(mod) do
Logger.warn "Applying sync_cmd: #{inspect sync_cmd} on #{repo}"
Logger.warn "Applying sync_cmd (delete) : #{inspect sync_cmd} on #{repo}"
case repo.get(mod, id) do
nil -> :ok
existing ->
@ -87,21 +125,21 @@ defmodule Farmbot.Repo do
defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: obj} = sync_cmd) do
if Code.ensure_loaded?(mod) do
Logger.warn "Applying sync_cmd: #{inspect sync_cmd} on #{repo}"
Logger.warn "Applying sync_cmd (insert_or_update): #{inspect sync_cmd} on #{repo}"
# We need to check if this object exists in the database.
case repo.get(mod, id) do
# If it does not, just return the newly created object.
nil -> obj
nil ->
mod.changeset(obj, %{})
|> repo.insert!
# if there is an existing record, copy the ecto meta from the old
# record. This allows `insert_or_update` to work properly.
existing -> %{obj | __meta__: existing.__meta__}
existing ->
mod.changeset(existing, Map.from_struct(obj))
|> repo.update!
end
# Build a changeset
|> mod.changeset()
# Apply it.
|> repo.insert_or_update!()
else
Logger.warn "Unknown module: #{mod} #{inspect sync_cmd}"
end
@ -131,6 +169,64 @@ defmodule Farmbot.Repo do
|> repo.insert_or_update!()
end
defp do_sync_all_resources(repo) do
with :ok <- sync_resource(repo, Device, "/api/device"),
:ok <- sync_resource(repo, FarmEvent, "/api/farm_events"),
:ok <- sync_resource(repo, Peripheral, "/api/peripherals"),
:ok <- sync_resource(repo, Point, "/api/points"),
:ok <- sync_resource(repo, Regimen, "/api/regimens"),
:ok <- sync_resource(repo, Sequence, "/api/sequences"),
:ok <- sync_resource(repo, Tool, "/api/tools")
do
:ok
end
end
defp sync_resource(repo, resource, slug) do
as = if resource in @singular_resources do
struct(resource)
else
[struct(resource)]
end
with {:ok, %{status_code: 200, body: body}} <- Farmbot.HTTP.get(slug),
{:ok, obj} <- Poison.decode(body, as: as)
do
do_insert_or_update(repo, obj)
else
{:error, reason} -> {:error, resource, reason}
{:ok, %{status_code: code, body: body}} ->
case Poison.decode(body) do
{:ok, %{"error" => msg}} -> {:error, resource, "HTTP ERROR: #{code} #{msg}"}
{:error, _} -> {:error, resource, "HTTP ERROR: #{code}"}
end
end
end
defp do_insert_or_update(_, []) do
:ok
end
defp do_insert_or_update(repo, [obj | rest]) do
with {:ok, _} <- do_insert_or_update(repo, obj) do
do_insert_or_update(repo, rest)
end
end
defp do_insert_or_update(repo, obj) when is_map(obj) do
res = case repo.get(obj.__struct__, obj.id) do
nil ->
obj.__struct__.changeset(obj, %{}) |> repo.insert
existing ->
obj.__struct__.changeset(existing, Map.from_struct(obj))
|> repo.update()
end
case res do
{:ok, _} -> :ok
{:error, reason} -> {:error, obj.__struct__, reason}
end
end
@doc false
defmacro __using__(_) do
quote do

View file

@ -52,7 +52,7 @@ defmodule Farmbot.System.ConfigStorage do
|> apply(:"get_#{type}_value", [group_name, key_name])
|> Ecto.Changeset.change(value: value)
|> update!()
|> Farmbot.System.ConfigStorage.Dispatcher.dispatch(key_name)
|> Farmbot.System.ConfigStorage.Dispatcher.dispatch(group_name, key_name)
end
def update_config_value(type, _, _, _) do

View file

@ -42,6 +42,8 @@ defmodule Farmbot.System.ConfigStorage.Migrations.SeedGroups do
defp populate_config_values("settings", group_id) do
create_value(BoolValue, false) |> create_config(group_id, "os_auto_update")
create_value(BoolValue, true) |> create_config(group_id, "first_boot")
create_value(BoolValue, true) |> create_config(group_id, "first_sync")
create_value(StringValue, "A") |> create_config(group_id, "current_repo")
create_value(BoolValue, true) |> create_config(group_id, "first_party_farmware")
create_value(BoolValue, false) |> create_config(group_id, "auto_sync")
create_value(StringValue, nil) |> create_config(group_id, "firmware_hardware")