store sync commands in sqlite.

This commit is contained in:
connor rigby 2017-11-14 07:15:18 -08:00
parent b07ae9c2fb
commit e96a3ab149
7 changed files with 121 additions and 64 deletions

View file

@ -87,19 +87,17 @@ defmodule Farmbot.BotState.Transport.GenMQTT.Client do
{:ok, state}
end
def on_publish(["bot", _, "sync", kind, id], msg, state) do
def on_publish(["bot", _, "sync", kind, remote_id], payload, state) do
spawn fn() ->
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
body = struct(mod)
sync_cmd = msg |> Poison.decode!(as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
Farmbot.Repo.register_sync_cmd(sync_cmd)
%{"body" => body} = Poison.decode!(payload, as: %{"body" => struct(mod)})
Farmbot.Repo.register_sync_cmd(String.to_integer(remote_id), kind, body)
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
Farmbot.Repo.flip()
# Farmbot.Repo.flip()
end
else
Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(msg)}"
Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(payload)}"
end
end
{:ok, state}

View file

@ -37,16 +37,14 @@ defmodule Farmbot.Logger.Console do
{:noreply, [], state}
end
defp maybe_log(%Farmbot.Log{module: nil}) do
defp maybe_log(%Farmbot.Log{module: nil} = log) do
IO.inspect log
:ok
end
defp maybe_log(%Farmbot.Log{module: module} = log) do
if List.first(Module.split(module)) == "Farmbot" do
IO.inspect log
else
:ok
end
# should_log = List.first(Module.split(module)) == "Farmbot"
IO.inspect log
end
def handle_call({:set_verbosity_level, num}, _from, state) do

View file

@ -10,11 +10,17 @@ defmodule Farmbot.Repo.JSONType do
{:ok, to_string(basic)}
end
def cast(%{__meta__: _, __struct__: _} = struct) do
Map.from_struct(struct)
|> Map.delete(:__meta__)
|> cast()
end
# try to encode as json here.
def cast(map_or_list) when is_list(map_or_list) or is_map(map_or_list) do
case Poison.encode(map_or_list) do
{:ok, bin} -> {:ok, bin}
_ -> :error
_reason -> :error
end
end

View file

@ -14,6 +14,10 @@ defmodule Farmbot.Repo do
Tool
}
alias Farmbot.BotState
alias Farmbot.System.ConfigStorage
alias ConfigStorage.SyncCmd
@singular_resources [Device]
@doc "Fetch the current repo."
@ -32,8 +36,8 @@ defmodule Farmbot.Repo do
end
@doc "Register a diff to be stored until a flip."
def register_sync_cmd(sync_cmd) do
GenServer.call(__MODULE__, {:register_sync_cmd, sync_cmd}, :infinity)
def register_sync_cmd(remote_id, kind, body) do
GenServer.call(__MODULE__, {:register_sync_cmd, remote_id, kind, body}, :infinity)
end
@doc false
@ -42,18 +46,21 @@ defmodule Farmbot.Repo do
end
def init([repo_a, repo_b]) do
needs_hard_sync = if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "first_sync") || Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
# Delete any old sync cmds.
ConfigStorage.delete_all SyncCmd
needs_hard_sync = if ConfigStorage.get_config_value(:bool, "settings", "first_sync") || auto_sync?() do
do_sync_all_resources(repo_a)
do_sync_all_resources(repo_b)
Farmbot.System.ConfigStorage.update_config_value(:bool, "settings", "first_sync", false)
Farmbot.BotState.set_sync_status(:synced)
ConfigStorage.update_config_value(:bool, "settings", "first_sync", false)
BotState.set_sync_status(:synced)
false
else
Farmbot.BotState.set_sync_status(:sync_now)
BotState.set_sync_status(:sync_now)
true
end
repos = case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do
repos = case ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" -> [repo_a, repo_b]
"B" -> [repo_b, repo_a]
end
@ -61,20 +68,20 @@ defmodule Farmbot.Repo do
# Copy configs
[current, _] = repos
copy_configs(current)
{:ok, %{repos: repos, sync_cmds: [], needs_hard_sync: needs_hard_sync}}
{:ok, %{repos: repos, needs_hard_sync: needs_hard_sync}}
end
defp copy_configs(repo) do
case repo.one(Farmbot.Repo.Device) do
case repo.one(Device) do
nil -> :ok
%{timezone: tz} ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "timezone", tz)
ConfigStorage.update_config_value(:string, "settings", "timezone", tz)
:ok
end
end
def terminate(_,_) do
Farmbot.BotState.set_sync_status(:sync_error)
BotState.set_sync_status(:sync_error)
end
def handle_call(:current_repo, _, %{repos: [repo_a, _]} = state) do
@ -86,65 +93,80 @@ defmodule Farmbot.Repo do
end
def handle_call(:flip, _, %{repos: [repo_a, repo_b], needs_hard_sync: true} = state) do
Farmbot.BotState.set_sync_status(:syncing)
Logger.warn 3, "Forcing full sync."
BotState.set_sync_status(:syncing)
do_sync_all_resources(repo_a)
do_sync_all_resources(repo_b)
Farmbot.BotState.set_sync_status(:synced)
BotState.set_sync_status(:synced)
copy_configs(repo_b)
case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "B")
"B" ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "A")
end
flip_repos_in_cs()
{:reply, :ok, %{state | repos: [repo_b, repo_a], sync_cmds: [], needs_hard_sync: false}}
{:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false}}
end
def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do
Farmbot.BotState.set_sync_status(:syncing)
Enum.reverse(state.sync_cmds) |> Enum.map(fn(sync_cmd) ->
apply_sync_cmd(repo_a, sync_cmd)
end)
Logger.busy 3, "Syncing"
BotState.set_sync_status(:syncing)
ConfigStorage.all(SyncCmd)
|> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at))
|> Enum.each(&apply_sync_cmd(repo_a, &1))
case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "B")
"B" ->
Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "A")
end
Farmbot.BotState.set_sync_status(:synced)
flip_repos_in_cs()
BotState.set_sync_status(:synced)
copy_configs(repo_b)
Logger.success 3, "Sync complete."
{:reply, repo_b, %{state | repos: [repo_b, repo_a]}}
end
def handle_call({:register_sync_cmd, sync_cmd}, _from, state) do
case Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do
false -> Farmbot.BotState.set_sync_status(:sync_now)
true -> Farmbot.BotState.set_sync_status(:syncing)
end
def handle_call({:register_sync_cmd, remote_id, kind, body}, _from, state) do
[_current_repo, other_repo] = state.repos
apply_sync_cmd(other_repo, sync_cmd)
sync_cmds = if sync_cmd in state.sync_cmds do
state.sync_cmds
else
[sync_cmd | state.sync_cmds]
case SyncCmd.changeset(struct(SyncCmd, %{remote_id: remote_id, kind: kind, body: body})) |> ConfigStorage.insert do
{:ok, sync_cmd} ->
apply_sync_cmd(other_repo, sync_cmd)
case auto_sync?() do
false -> BotState.set_sync_status(:sync_now)
true -> BotState.set_sync_status(:syncing)
end
{:reply, :ok, state}
{:error, reason} ->
BotState.set_sync_status(:sync_error)
Logger.error 1, "Failed to apply sync command: #{inspect reason}"
{:reply, :error, state}
end
{:reply, :ok, %{state | sync_cmds: sync_cmds}}
end
defp apply_sync_cmd(repo, sync_cmd) do
defp flip_repos_in_cs do
case ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" ->
ConfigStorage.update_config_value(:string, "settings", "current_repo", "B")
"B" ->
ConfigStorage.update_config_value(:string, "settings", "current_repo", "A")
end
end
defp strip_struct(%{__struct__: _, __meta__: _} = struct) do
Map.from_struct(struct) |> Map.delete(:__meta__)
end
defp strip_struct(already_map), do: already_map
defp auto_sync? do
ConfigStorage.get_config_value(:bool, "settings", "auto_sync")
end
defp apply_sync_cmd(repo, %SyncCmd{} = sync_cmd) do
try do
do_apply_sync_cmd(repo, sync_cmd)
rescue
e in Ecto.InvalidChangesetError ->
Farmbot.BotState.set_sync_status(:sync_error)
BotState.set_sync_status(:sync_error)
Logger.error 1, "Failed to apply sync_cmd: (#{repo}) #{inspect sync_cmd} (#{e.action})"
fix_repo(repo, sync_cmd)
end
end
defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: nil} = sync_cmd) do
defp do_apply_sync_cmd(repo, %SyncCmd{remote_id: id, kind: kind, body: nil} = sync_cmd) do
mod = Module.concat(["Farmbot", "Repo", kind])
# an object was deleted.
if Code.ensure_loaded?(mod) do
Logger.busy 3, "Applying sync_cmd (#{mod}: delete) on #{repo}"
@ -160,7 +182,9 @@ defmodule Farmbot.Repo do
end
end
defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: obj} = sync_cmd) do
defp do_apply_sync_cmd(repo, %SyncCmd{remote_id: id, kind: kind, body: obj} = sync_cmd) do
obj = strip_struct(obj)
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
Logger.busy 3, "Applying sync_cmd (#{mod}: insert_or_update on #{repo}"
@ -174,7 +198,7 @@ defmodule Farmbot.Repo do
# if there is an existing record, copy the ecto meta from the old
# record. This allows `insert_or_update` to work properly.
existing ->
mod.changeset(existing, Map.from_struct(obj))
mod.changeset(existing, obj)
|> repo.update!
end
else

View file

@ -1,3 +0,0 @@
defmodule Farmbot.Repo.SyncCmd do
defstruct [:id, :kind, :body]
end

View file

@ -0,0 +1,22 @@
defmodule Farmbot.System.ConfigStorage.SyncCmd do
@moduledoc "describes an update to a API resource."
use Ecto.Schema
import Ecto.Changeset
alias Farmbot.Repo.JSONType
schema "sync_cmds" do
field(:remote_id, :integer)
field(:kind, :string)
field(:body, JSONType)
timestamps()
end
@required_fields [:remote_id, :kind, :body]
def changeset(config, params \\ %{}) do
config
|> cast(params, @required_fields)
|> validate_required(@required_fields)
end
end

View file

@ -0,0 +1,12 @@
defmodule Farmbot.System.ConfigStorage.Migrations.AddSyncCmdTable do
use Ecto.Migration
def change do
create table("sync_cmds") do
add(:remote_id, :integer)
add(:kind, :string)
add(:body, :string)
timestamps()
end
end
end