Read peripherals after a sync.

This commit is contained in:
Connor Rigby 2017-11-14 10:29:01 -08:00
parent e96a3ab149
commit 47dcf07314

View file

@ -20,6 +20,12 @@ defmodule Farmbot.Repo do
@singular_resources [Device]
# 45 minutes.
@timeout 2.7e+6 |> round()
# fifteen seconds.
# @timeout 15000
@doc "Fetch the current repo."
def current_repo do
GenServer.call(__MODULE__, :current_repo)
@ -40,6 +46,11 @@ defmodule Farmbot.Repo do
GenServer.call(__MODULE__, {:register_sync_cmd, remote_id, kind, body}, :infinity)
end
@doc false
def force_hard_sync do
GenServer.call(__MODULE__, :force_hard_sync)
end
@doc false
def start_link(repos) do
GenServer.start_link(__MODULE__, repos, [name: __MODULE__])
@ -47,11 +58,10 @@ defmodule Farmbot.Repo do
def init([repo_a, repo_b]) do
# Delete any old sync cmds.
ConfigStorage.delete_all SyncCmd
destroy_all_sync_cmds()
needs_hard_sync = if ConfigStorage.get_config_value(:bool, "settings", "first_sync") || auto_sync?() do
do_sync_all_resources(repo_a)
do_sync_all_resources(repo_b)
do_sync_both(repo_a, repo_b)
ConfigStorage.update_config_value(:bool, "settings", "first_sync", false)
BotState.set_sync_status(:synced)
false
@ -68,20 +78,19 @@ defmodule Farmbot.Repo do
# Copy configs
[current, _] = repos
copy_configs(current)
{:ok, %{repos: repos, needs_hard_sync: needs_hard_sync}}
{:ok, %{repos: repos, needs_hard_sync: needs_hard_sync, timer: start_timer()}}
end
defp copy_configs(repo) do
case repo.one(Device) do
nil -> :ok
%{timezone: tz} ->
ConfigStorage.update_config_value(:string, "settings", "timezone", tz)
:ok
def terminate(reason,_) do
if reason not in [:normal, :shutdown] do
BotState.set_sync_status(:sync_error)
end
end
def terminate(_,_) do
BotState.set_sync_status(:sync_error)
def handle_call(:force_hard_sync, _, state) do
maybe_cancel_timer(state.timer)
BotState.set_sync_status(:sync_now)
{:reply, :ok, %{state | timer: nil, needs_hard_sync: true}}
end
def handle_call(:current_repo, _, %{repos: [repo_a, _]} = state) do
@ -93,32 +102,37 @@ defmodule Farmbot.Repo do
end
def handle_call(:flip, _, %{repos: [repo_a, repo_b], needs_hard_sync: true} = state) do
maybe_cancel_timer(state.timer)
destroy_all_sync_cmds()
Logger.warn 3, "Forcing full sync."
BotState.set_sync_status(:syncing)
do_sync_all_resources(repo_a)
do_sync_all_resources(repo_b)
do_sync_both(repo_a, repo_b)
BotState.set_sync_status(:synced)
copy_configs(repo_b)
flip_repos_in_cs()
{:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false}}
{:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false, timer: start_timer()}}
end
def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do
maybe_cancel_timer(state.timer)
Logger.busy 3, "Syncing"
BotState.set_sync_status(:syncing)
# Fetch all sync_cmds and apply them in order they were received.
ConfigStorage.all(SyncCmd)
|> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at))
|> Enum.each(&apply_sync_cmd(repo_a, &1))
|> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at))
|> Enum.each(&apply_sync_cmd(repo_a, &1))
flip_repos_in_cs()
BotState.set_sync_status(:synced)
copy_configs(repo_b)
destroy_all_sync_cmds()
Logger.success 3, "Sync complete."
{:reply, repo_b, %{state | repos: [repo_b, repo_a]}}
{:reply, repo_b, %{state | repos: [repo_b, repo_a], timer: start_timer()}}
end
def handle_call({:register_sync_cmd, remote_id, kind, body}, _from, state) do
maybe_cancel_timer(state.timer)
[_current_repo, other_repo] = state.repos
case SyncCmd.changeset(struct(SyncCmd, %{remote_id: remote_id, kind: kind, body: body})) |> ConfigStorage.insert do
{:ok, sync_cmd} ->
@ -127,14 +141,55 @@ defmodule Farmbot.Repo do
false -> BotState.set_sync_status(:sync_now)
true -> BotState.set_sync_status(:syncing)
end
{:reply, :ok, state}
{:reply, :ok, %{state | timer: start_timer()}}
{:error, reason} ->
BotState.set_sync_status(:sync_error)
Logger.error 1, "Failed to apply sync command: #{inspect reason}"
{:reply, :error, state}
{:reply, :error, %{state | needs_hard_sync: true}}
end
end
def handle_info(:timeout, state) do
Logger.warn 3, "Haven't received any auto sync messages in a while. Forcing hard sync."
BotState.set_sync_status(:sync_now)
destroy_all_sync_cmds()
{:noreply, %{state | timer: start_timer(), needs_hard_sync: true}}
end
defp copy_configs(repo) do
case repo.one(Device) do
nil -> :ok
%{timezone: tz} ->
ConfigStorage.update_config_value(:string, "settings", "timezone", tz)
:ok
end
repo.all(Peripheral) |> Enum.all?(fn(%{mode: mode, pin: pin}) ->
mode = if mode == 0, do: :digital, else: :analog
# Logger.busy 3, "Reading peripheral (#{pin} - #{mode})"
Farmbot.Firmware.read_pin(pin, mode)
end)
end
defp destroy_all_sync_cmds do
ConfigStorage.delete_all(SyncCmd)
end
defp start_timer do
if auto_sync?() do
nil
else
# Logger.debug 3, "Starting sync timer."
Process.send_after(self(), :timeout, @timeout)
end
end
defp maybe_cancel_timer(nil), do: :ok
defp maybe_cancel_timer(timer) do
# Logger.debug 3, "Canceling sync timer."
Process.cancel_timer(timer)
end
defp flip_repos_in_cs do
case ConfigStorage.get_config_value(:string, "settings", "current_repo") do
"A" ->
@ -183,7 +238,7 @@ defmodule Farmbot.Repo do
end
defp do_apply_sync_cmd(repo, %SyncCmd{remote_id: id, kind: kind, body: obj} = sync_cmd) do
obj = strip_struct(obj)
not_struct = strip_struct(obj)
mod = Module.concat(["Farmbot", "Repo", kind])
if Code.ensure_loaded?(mod) do
Logger.busy 3, "Applying sync_cmd (#{mod}: insert_or_update on #{repo}"
@ -198,7 +253,7 @@ defmodule Farmbot.Repo do
# if there is an existing record, copy the ecto meta from the old
# record. This allows `insert_or_update` to work properly.
existing ->
mod.changeset(existing, obj)
mod.changeset(existing, not_struct)
|> repo.update!
end
else
@ -230,8 +285,16 @@ defmodule Farmbot.Repo do
|> repo.insert_or_update!()
end
defp do_sync_both(repo_a, repo_b) do
case do_sync_all_resources(repo_a) do
:ok ->
do_sync_all_resources(repo_b)
err -> err
end
end
defp do_sync_all_resources(repo) do
with :ok <- sync_resource(repo, Device, "/api/device"),
with :ok <- sync_resource(repo, Device, "/api/device"),
:ok <- sync_resource(repo, FarmEvent, "/api/farm_events"),
:ok <- sync_resource(repo, Peripheral, "/api/peripherals"),
:ok <- sync_resource(repo, Point, "/api/points"),
@ -240,26 +303,33 @@ defmodule Farmbot.Repo do
:ok <- sync_resource(repo, Tool, "/api/tools")
do
:ok
else
err ->
Logger.error 1, "sync on #{repo} failed: #{inspect err}"
err
end
end
defp sync_resource(repo, resource, slug) do
Logger.debug 3, "syncing: #{resource} (#{slug})"
as = if resource in @singular_resources do
struct(resource)
else
[struct(resource)]
end
as = if resource in @singular_resources, do: struct(resource), else: [struct(resource)]
with {:ok, %{status_code: 200, body: body}} <- Farmbot.HTTP.get(slug),
{:ok, obj} <- Poison.decode(body, as: as)
{:ok, obj_or_list} <- Poison.decode(body, as: as)
do
do_insert_or_update(repo, obj)
case do_insert_or_update(repo, obj_or_list) do
{:ok, _} when resource in @singular_resources -> :ok
:ok -> :ok
err -> err
end
else
{:error, reason} -> {:error, resource, reason}
{:error, resource, reason} -> {:error, resource, reason}
{:ok, %{status_code: code, body: body}} ->
case Poison.decode(body) do
{:ok, %{"error" => msg}} -> {:error, resource, "HTTP ERROR: #{code} #{msg}"}
{:error, _} -> {:error, resource, "HTTP ERROR: #{code}"}
{:error, _,} -> {:error, resource, "HTTP ERROR: #{code}"}
{:error, _, _} -> {:error, resource, "JSON ERROR: #{code}"}
end
end
end
@ -282,10 +352,11 @@ defmodule Farmbot.Repo do
obj.__struct__.changeset(existing, Map.from_struct(obj))
|> repo.update()
end
case res do
{:ok, _} -> :ok
{:error, reason} -> {:error, obj.__struct__, reason}
{:ok, _} -> res
{:error, reason} ->
Logger.error 2, "failed to sync #{obj.__struct__}: #{inspect reason}"
{:error, obj.__struct__, reason}
end
end