diff --git a/lib/farmbot/repo/repo.ex b/lib/farmbot/repo/repo.ex index 8664860c..4181e3cf 100644 --- a/lib/farmbot/repo/repo.ex +++ b/lib/farmbot/repo/repo.ex @@ -20,6 +20,12 @@ defmodule Farmbot.Repo do @singular_resources [Device] + # 45 minutes. + @timeout 2.7e+6 |> round() + + # fifteen seconds. + # @timeout 15000 + @doc "Fetch the current repo." def current_repo do GenServer.call(__MODULE__, :current_repo) @@ -40,6 +46,11 @@ defmodule Farmbot.Repo do GenServer.call(__MODULE__, {:register_sync_cmd, remote_id, kind, body}, :infinity) end + @doc false + def force_hard_sync do + GenServer.call(__MODULE__, :force_hard_sync) + end + @doc false def start_link(repos) do GenServer.start_link(__MODULE__, repos, [name: __MODULE__]) @@ -47,11 +58,10 @@ defmodule Farmbot.Repo do def init([repo_a, repo_b]) do # Delete any old sync cmds. - ConfigStorage.delete_all SyncCmd + destroy_all_sync_cmds() needs_hard_sync = if ConfigStorage.get_config_value(:bool, "settings", "first_sync") || auto_sync?() do - do_sync_all_resources(repo_a) - do_sync_all_resources(repo_b) + do_sync_both(repo_a, repo_b) ConfigStorage.update_config_value(:bool, "settings", "first_sync", false) BotState.set_sync_status(:synced) false @@ -68,20 +78,19 @@ defmodule Farmbot.Repo do # Copy configs [current, _] = repos copy_configs(current) - {:ok, %{repos: repos, needs_hard_sync: needs_hard_sync}} + {:ok, %{repos: repos, needs_hard_sync: needs_hard_sync, timer: start_timer()}} end - defp copy_configs(repo) do - case repo.one(Device) do - nil -> :ok - %{timezone: tz} -> - ConfigStorage.update_config_value(:string, "settings", "timezone", tz) - :ok + def terminate(reason,_) do + if reason not in [:normal, :shutdown] do + BotState.set_sync_status(:sync_error) end end - def terminate(_,_) do - BotState.set_sync_status(:sync_error) + def handle_call(:force_hard_sync, _, state) do + maybe_cancel_timer(state.timer) + BotState.set_sync_status(:sync_now) + {:reply, :ok, %{state | timer: nil, needs_hard_sync: true}} end def handle_call(:current_repo, _, %{repos: [repo_a, _]} = state) do @@ -93,32 +102,37 @@ defmodule Farmbot.Repo do end def handle_call(:flip, _, %{repos: [repo_a, repo_b], needs_hard_sync: true} = state) do + maybe_cancel_timer(state.timer) + destroy_all_sync_cmds() Logger.warn 3, "Forcing full sync." BotState.set_sync_status(:syncing) - do_sync_all_resources(repo_a) - do_sync_all_resources(repo_b) + do_sync_both(repo_a, repo_b) BotState.set_sync_status(:synced) copy_configs(repo_b) flip_repos_in_cs() - - {:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false}} + {:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false, timer: start_timer()}} end def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do + maybe_cancel_timer(state.timer) Logger.busy 3, "Syncing" BotState.set_sync_status(:syncing) + + # Fetch all sync_cmds and apply them in order they were received. ConfigStorage.all(SyncCmd) - |> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at)) - |> Enum.each(&apply_sync_cmd(repo_a, &1)) + |> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at)) + |> Enum.each(&apply_sync_cmd(repo_a, &1)) flip_repos_in_cs() BotState.set_sync_status(:synced) copy_configs(repo_b) + destroy_all_sync_cmds() Logger.success 3, "Sync complete." - {:reply, repo_b, %{state | repos: [repo_b, repo_a]}} + {:reply, repo_b, %{state | repos: [repo_b, repo_a], timer: start_timer()}} end def handle_call({:register_sync_cmd, remote_id, kind, body}, _from, state) do + maybe_cancel_timer(state.timer) [_current_repo, other_repo] = state.repos case SyncCmd.changeset(struct(SyncCmd, %{remote_id: remote_id, kind: kind, body: body})) |> ConfigStorage.insert do {:ok, sync_cmd} -> @@ -127,14 +141,55 @@ defmodule Farmbot.Repo do false -> BotState.set_sync_status(:sync_now) true -> BotState.set_sync_status(:syncing) end - {:reply, :ok, state} + {:reply, :ok, %{state | timer: start_timer()}} {:error, reason} -> BotState.set_sync_status(:sync_error) Logger.error 1, "Failed to apply sync command: #{inspect reason}" - {:reply, :error, state} + {:reply, :error, %{state | needs_hard_sync: true}} end end + def handle_info(:timeout, state) do + Logger.warn 3, "Haven't received any auto sync messages in a while. Forcing hard sync." + BotState.set_sync_status(:sync_now) + destroy_all_sync_cmds() + {:noreply, %{state | timer: start_timer(), needs_hard_sync: true}} + end + + defp copy_configs(repo) do + case repo.one(Device) do + nil -> :ok + %{timezone: tz} -> + ConfigStorage.update_config_value(:string, "settings", "timezone", tz) + :ok + end + + repo.all(Peripheral) |> Enum.all?(fn(%{mode: mode, pin: pin}) -> + mode = if mode == 0, do: :digital, else: :analog + # Logger.busy 3, "Reading peripheral (#{pin} - #{mode})" + Farmbot.Firmware.read_pin(pin, mode) + end) + end + + defp destroy_all_sync_cmds do + ConfigStorage.delete_all(SyncCmd) + end + + defp start_timer do + if auto_sync?() do + nil + else + # Logger.debug 3, "Starting sync timer." + Process.send_after(self(), :timeout, @timeout) + end + end + + defp maybe_cancel_timer(nil), do: :ok + defp maybe_cancel_timer(timer) do + # Logger.debug 3, "Canceling sync timer." + Process.cancel_timer(timer) + end + defp flip_repos_in_cs do case ConfigStorage.get_config_value(:string, "settings", "current_repo") do "A" -> @@ -183,7 +238,7 @@ defmodule Farmbot.Repo do end defp do_apply_sync_cmd(repo, %SyncCmd{remote_id: id, kind: kind, body: obj} = sync_cmd) do - obj = strip_struct(obj) + not_struct = strip_struct(obj) mod = Module.concat(["Farmbot", "Repo", kind]) if Code.ensure_loaded?(mod) do Logger.busy 3, "Applying sync_cmd (#{mod}: insert_or_update on #{repo}" @@ -198,7 +253,7 @@ defmodule Farmbot.Repo do # if there is an existing record, copy the ecto meta from the old # record. This allows `insert_or_update` to work properly. existing -> - mod.changeset(existing, obj) + mod.changeset(existing, not_struct) |> repo.update! end else @@ -230,8 +285,16 @@ defmodule Farmbot.Repo do |> repo.insert_or_update!() end + defp do_sync_both(repo_a, repo_b) do + case do_sync_all_resources(repo_a) do + :ok -> + do_sync_all_resources(repo_b) + err -> err + end + end + defp do_sync_all_resources(repo) do - with :ok <- sync_resource(repo, Device, "/api/device"), + with :ok <- sync_resource(repo, Device, "/api/device"), :ok <- sync_resource(repo, FarmEvent, "/api/farm_events"), :ok <- sync_resource(repo, Peripheral, "/api/peripherals"), :ok <- sync_resource(repo, Point, "/api/points"), @@ -240,26 +303,33 @@ defmodule Farmbot.Repo do :ok <- sync_resource(repo, Tool, "/api/tools") do :ok + else + err -> + Logger.error 1, "sync on #{repo} failed: #{inspect err}" + err end end defp sync_resource(repo, resource, slug) do Logger.debug 3, "syncing: #{resource} (#{slug})" - as = if resource in @singular_resources do - struct(resource) - else - [struct(resource)] - end + as = if resource in @singular_resources, do: struct(resource), else: [struct(resource)] + with {:ok, %{status_code: 200, body: body}} <- Farmbot.HTTP.get(slug), - {:ok, obj} <- Poison.decode(body, as: as) + {:ok, obj_or_list} <- Poison.decode(body, as: as) do - do_insert_or_update(repo, obj) + case do_insert_or_update(repo, obj_or_list) do + {:ok, _} when resource in @singular_resources -> :ok + :ok -> :ok + err -> err + end else {:error, reason} -> {:error, resource, reason} + {:error, resource, reason} -> {:error, resource, reason} {:ok, %{status_code: code, body: body}} -> case Poison.decode(body) do {:ok, %{"error" => msg}} -> {:error, resource, "HTTP ERROR: #{code} #{msg}"} - {:error, _} -> {:error, resource, "HTTP ERROR: #{code}"} + {:error, _,} -> {:error, resource, "HTTP ERROR: #{code}"} + {:error, _, _} -> {:error, resource, "JSON ERROR: #{code}"} end end end @@ -282,10 +352,11 @@ defmodule Farmbot.Repo do obj.__struct__.changeset(existing, Map.from_struct(obj)) |> repo.update() end - case res do - {:ok, _} -> :ok - {:error, reason} -> {:error, obj.__struct__, reason} + {:ok, _} -> res + {:error, reason} -> + Logger.error 2, "failed to sync #{obj.__struct__}: #{inspect reason}" + {:error, obj.__struct__, reason} end end