From e96a3ab1494ffec4f30b98f083cce24ac218a2f6 Mon Sep 17 00:00:00 2001 From: connor rigby Date: Tue, 14 Nov 2017 07:15:18 -0800 Subject: [PATCH] store sync commands in sqlite. --- .../bot_state/transport/gen_mqtt/client.ex | 12 +- lib/farmbot/logger/console.ex | 10 +- lib/farmbot/repo/json_type.ex | 8 +- lib/farmbot/repo/repo.ex | 118 +++++++++++------- lib/farmbot/repo/sync_cmd.ex | 3 - lib/farmbot/system/config_storage/sync_cmd.ex | 22 ++++ .../20171114135840_add_sync_cmd_table.exs | 12 ++ 7 files changed, 121 insertions(+), 64 deletions(-) delete mode 100644 lib/farmbot/repo/sync_cmd.ex create mode 100644 lib/farmbot/system/config_storage/sync_cmd.ex create mode 100644 priv/config_storage/migrations/20171114135840_add_sync_cmd_table.exs diff --git a/lib/farmbot/bot_state/transport/gen_mqtt/client.ex b/lib/farmbot/bot_state/transport/gen_mqtt/client.ex index d0a626bb..19e306c2 100644 --- a/lib/farmbot/bot_state/transport/gen_mqtt/client.ex +++ b/lib/farmbot/bot_state/transport/gen_mqtt/client.ex @@ -87,19 +87,17 @@ defmodule Farmbot.BotState.Transport.GenMQTT.Client do {:ok, state} end - def on_publish(["bot", _, "sync", kind, id], msg, state) do + def on_publish(["bot", _, "sync", kind, remote_id], payload, state) do spawn fn() -> mod = Module.concat(["Farmbot", "Repo", kind]) if Code.ensure_loaded?(mod) do - body = struct(mod) - sync_cmd = msg |> Poison.decode!(as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id)) - Farmbot.Repo.register_sync_cmd(sync_cmd) - + %{"body" => body} = Poison.decode!(payload, as: %{"body" => struct(mod)}) + Farmbot.Repo.register_sync_cmd(String.to_integer(remote_id), kind, body) if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do - Farmbot.Repo.flip() + # Farmbot.Repo.flip() end else - Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(msg)}" + Logger.warn 2, "Unknown syncable: #{mod}: #{inspect Poison.decode!(payload)}" end end {:ok, state} diff --git a/lib/farmbot/logger/console.ex b/lib/farmbot/logger/console.ex index 3dae007e..9bddcc96 100644 --- a/lib/farmbot/logger/console.ex +++ b/lib/farmbot/logger/console.ex @@ -37,16 +37,14 @@ defmodule Farmbot.Logger.Console do {:noreply, [], state} end - defp maybe_log(%Farmbot.Log{module: nil}) do + defp maybe_log(%Farmbot.Log{module: nil} = log) do + IO.inspect log :ok end defp maybe_log(%Farmbot.Log{module: module} = log) do - if List.first(Module.split(module)) == "Farmbot" do - IO.inspect log - else - :ok - end + # should_log = List.first(Module.split(module)) == "Farmbot" + IO.inspect log end def handle_call({:set_verbosity_level, num}, _from, state) do diff --git a/lib/farmbot/repo/json_type.ex b/lib/farmbot/repo/json_type.ex index 05f91d9f..9c74d80f 100644 --- a/lib/farmbot/repo/json_type.ex +++ b/lib/farmbot/repo/json_type.ex @@ -10,11 +10,17 @@ defmodule Farmbot.Repo.JSONType do {:ok, to_string(basic)} end + def cast(%{__meta__: _, __struct__: _} = struct) do + Map.from_struct(struct) + |> Map.delete(:__meta__) + |> cast() + end + # try to encode as json here. def cast(map_or_list) when is_list(map_or_list) or is_map(map_or_list) do case Poison.encode(map_or_list) do {:ok, bin} -> {:ok, bin} - _ -> :error + _reason -> :error end end diff --git a/lib/farmbot/repo/repo.ex b/lib/farmbot/repo/repo.ex index 750708f2..8664860c 100644 --- a/lib/farmbot/repo/repo.ex +++ b/lib/farmbot/repo/repo.ex @@ -14,6 +14,10 @@ defmodule Farmbot.Repo do Tool } + alias Farmbot.BotState + alias Farmbot.System.ConfigStorage + alias ConfigStorage.SyncCmd + @singular_resources [Device] @doc "Fetch the current repo." @@ -32,8 +36,8 @@ defmodule Farmbot.Repo do end @doc "Register a diff to be stored until a flip." - def register_sync_cmd(sync_cmd) do - GenServer.call(__MODULE__, {:register_sync_cmd, sync_cmd}, :infinity) + def register_sync_cmd(remote_id, kind, body) do + GenServer.call(__MODULE__, {:register_sync_cmd, remote_id, kind, body}, :infinity) end @doc false @@ -42,18 +46,21 @@ defmodule Farmbot.Repo do end def init([repo_a, repo_b]) do - needs_hard_sync = if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "first_sync") || Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do + # Delete any old sync cmds. + ConfigStorage.delete_all SyncCmd + + needs_hard_sync = if ConfigStorage.get_config_value(:bool, "settings", "first_sync") || auto_sync?() do do_sync_all_resources(repo_a) do_sync_all_resources(repo_b) - Farmbot.System.ConfigStorage.update_config_value(:bool, "settings", "first_sync", false) - Farmbot.BotState.set_sync_status(:synced) + ConfigStorage.update_config_value(:bool, "settings", "first_sync", false) + BotState.set_sync_status(:synced) false else - Farmbot.BotState.set_sync_status(:sync_now) + BotState.set_sync_status(:sync_now) true end - repos = case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do + repos = case ConfigStorage.get_config_value(:string, "settings", "current_repo") do "A" -> [repo_a, repo_b] "B" -> [repo_b, repo_a] end @@ -61,20 +68,20 @@ defmodule Farmbot.Repo do # Copy configs [current, _] = repos copy_configs(current) - {:ok, %{repos: repos, sync_cmds: [], needs_hard_sync: needs_hard_sync}} + {:ok, %{repos: repos, needs_hard_sync: needs_hard_sync}} end defp copy_configs(repo) do - case repo.one(Farmbot.Repo.Device) do + case repo.one(Device) do nil -> :ok %{timezone: tz} -> - Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "timezone", tz) + ConfigStorage.update_config_value(:string, "settings", "timezone", tz) :ok end end def terminate(_,_) do - Farmbot.BotState.set_sync_status(:sync_error) + BotState.set_sync_status(:sync_error) end def handle_call(:current_repo, _, %{repos: [repo_a, _]} = state) do @@ -86,65 +93,80 @@ defmodule Farmbot.Repo do end def handle_call(:flip, _, %{repos: [repo_a, repo_b], needs_hard_sync: true} = state) do - Farmbot.BotState.set_sync_status(:syncing) + Logger.warn 3, "Forcing full sync." + BotState.set_sync_status(:syncing) do_sync_all_resources(repo_a) do_sync_all_resources(repo_b) - Farmbot.BotState.set_sync_status(:synced) + BotState.set_sync_status(:synced) copy_configs(repo_b) - case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do - "A" -> - Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "B") - "B" -> - Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "A") - end + flip_repos_in_cs() - {:reply, :ok, %{state | repos: [repo_b, repo_a], sync_cmds: [], needs_hard_sync: false}} + {:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false}} end def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do - Farmbot.BotState.set_sync_status(:syncing) - Enum.reverse(state.sync_cmds) |> Enum.map(fn(sync_cmd) -> - apply_sync_cmd(repo_a, sync_cmd) - end) + Logger.busy 3, "Syncing" + BotState.set_sync_status(:syncing) + ConfigStorage.all(SyncCmd) + |> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at)) + |> Enum.each(&apply_sync_cmd(repo_a, &1)) - case Farmbot.System.ConfigStorage.get_config_value(:string, "settings", "current_repo") do - "A" -> - Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "B") - "B" -> - Farmbot.System.ConfigStorage.update_config_value(:string, "settings", "current_repo", "A") - end - Farmbot.BotState.set_sync_status(:synced) + flip_repos_in_cs() + BotState.set_sync_status(:synced) copy_configs(repo_b) + Logger.success 3, "Sync complete." {:reply, repo_b, %{state | repos: [repo_b, repo_a]}} end - def handle_call({:register_sync_cmd, sync_cmd}, _from, state) do - case Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "auto_sync") do - false -> Farmbot.BotState.set_sync_status(:sync_now) - true -> Farmbot.BotState.set_sync_status(:syncing) - end + def handle_call({:register_sync_cmd, remote_id, kind, body}, _from, state) do [_current_repo, other_repo] = state.repos - apply_sync_cmd(other_repo, sync_cmd) - sync_cmds = if sync_cmd in state.sync_cmds do - state.sync_cmds - else - [sync_cmd | state.sync_cmds] + case SyncCmd.changeset(struct(SyncCmd, %{remote_id: remote_id, kind: kind, body: body})) |> ConfigStorage.insert do + {:ok, sync_cmd} -> + apply_sync_cmd(other_repo, sync_cmd) + case auto_sync?() do + false -> BotState.set_sync_status(:sync_now) + true -> BotState.set_sync_status(:syncing) + end + {:reply, :ok, state} + {:error, reason} -> + BotState.set_sync_status(:sync_error) + Logger.error 1, "Failed to apply sync command: #{inspect reason}" + {:reply, :error, state} end - {:reply, :ok, %{state | sync_cmds: sync_cmds}} end - defp apply_sync_cmd(repo, sync_cmd) do + defp flip_repos_in_cs do + case ConfigStorage.get_config_value(:string, "settings", "current_repo") do + "A" -> + ConfigStorage.update_config_value(:string, "settings", "current_repo", "B") + "B" -> + ConfigStorage.update_config_value(:string, "settings", "current_repo", "A") + end + end + + defp strip_struct(%{__struct__: _, __meta__: _} = struct) do + Map.from_struct(struct) |> Map.delete(:__meta__) + end + + defp strip_struct(already_map), do: already_map + + defp auto_sync? do + ConfigStorage.get_config_value(:bool, "settings", "auto_sync") + end + + defp apply_sync_cmd(repo, %SyncCmd{} = sync_cmd) do try do do_apply_sync_cmd(repo, sync_cmd) rescue e in Ecto.InvalidChangesetError -> - Farmbot.BotState.set_sync_status(:sync_error) + BotState.set_sync_status(:sync_error) Logger.error 1, "Failed to apply sync_cmd: (#{repo}) #{inspect sync_cmd} (#{e.action})" fix_repo(repo, sync_cmd) end end - defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: nil} = sync_cmd) do + defp do_apply_sync_cmd(repo, %SyncCmd{remote_id: id, kind: kind, body: nil} = sync_cmd) do + mod = Module.concat(["Farmbot", "Repo", kind]) # an object was deleted. if Code.ensure_loaded?(mod) do Logger.busy 3, "Applying sync_cmd (#{mod}: delete) on #{repo}" @@ -160,7 +182,9 @@ defmodule Farmbot.Repo do end end - defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: obj} = sync_cmd) do + defp do_apply_sync_cmd(repo, %SyncCmd{remote_id: id, kind: kind, body: obj} = sync_cmd) do + obj = strip_struct(obj) + mod = Module.concat(["Farmbot", "Repo", kind]) if Code.ensure_loaded?(mod) do Logger.busy 3, "Applying sync_cmd (#{mod}: insert_or_update on #{repo}" @@ -174,7 +198,7 @@ defmodule Farmbot.Repo do # if there is an existing record, copy the ecto meta from the old # record. This allows `insert_or_update` to work properly. existing -> - mod.changeset(existing, Map.from_struct(obj)) + mod.changeset(existing, obj) |> repo.update! end else diff --git a/lib/farmbot/repo/sync_cmd.ex b/lib/farmbot/repo/sync_cmd.ex deleted file mode 100644 index 2a22cd42..00000000 --- a/lib/farmbot/repo/sync_cmd.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule Farmbot.Repo.SyncCmd do - defstruct [:id, :kind, :body] -end diff --git a/lib/farmbot/system/config_storage/sync_cmd.ex b/lib/farmbot/system/config_storage/sync_cmd.ex new file mode 100644 index 00000000..c2fe9a7d --- /dev/null +++ b/lib/farmbot/system/config_storage/sync_cmd.ex @@ -0,0 +1,22 @@ +defmodule Farmbot.System.ConfigStorage.SyncCmd do + @moduledoc "describes an update to a API resource." + + use Ecto.Schema + import Ecto.Changeset + alias Farmbot.Repo.JSONType + + schema "sync_cmds" do + field(:remote_id, :integer) + field(:kind, :string) + field(:body, JSONType) + timestamps() + end + + @required_fields [:remote_id, :kind, :body] + + def changeset(config, params \\ %{}) do + config + |> cast(params, @required_fields) + |> validate_required(@required_fields) + end +end diff --git a/priv/config_storage/migrations/20171114135840_add_sync_cmd_table.exs b/priv/config_storage/migrations/20171114135840_add_sync_cmd_table.exs new file mode 100644 index 00000000..c7db1d16 --- /dev/null +++ b/priv/config_storage/migrations/20171114135840_add_sync_cmd_table.exs @@ -0,0 +1,12 @@ +defmodule Farmbot.System.ConfigStorage.Migrations.AddSyncCmdTable do + use Ecto.Migration + + def change do + create table("sync_cmds") do + add(:remote_id, :integer) + add(:kind, :string) + add(:body, :string) + timestamps() + end + end +end