Don't only block other sync during sync
This commit is contained in:
parent
536c0058bb
commit
f076e4cf66
|
@ -8,7 +8,6 @@ defmodule Farmbot.Repo.FarmEvent do
|
|||
"""
|
||||
|
||||
@on_load :load_nif
|
||||
|
||||
def load_nif do
|
||||
require Logger
|
||||
nif_file = '#{:code.priv_dir(:farmbot)}/build_calendar'
|
||||
|
|
|
@ -27,7 +27,7 @@ defmodule Farmbot.Repo do
|
|||
# @timeout 15000
|
||||
|
||||
# 1.5 minutes.
|
||||
@call_timeout 90000
|
||||
@call_timeout_ms 90_000
|
||||
|
||||
@doc "Fetch the current repo."
|
||||
def current_repo do
|
||||
|
@ -41,7 +41,7 @@ defmodule Farmbot.Repo do
|
|||
|
||||
@doc "Flip the repos."
|
||||
def flip do
|
||||
GenServer.call(__MODULE__, :flip, @call_timeout)
|
||||
GenServer.call(__MODULE__, :flip, @call_timeout_ms)
|
||||
end
|
||||
|
||||
@doc "Register a diff to be stored until a flip."
|
||||
|
@ -59,6 +59,17 @@ defmodule Farmbot.Repo do
|
|||
GenServer.start_link(__MODULE__, repos, name: __MODULE__)
|
||||
end
|
||||
|
||||
defmodule State do
|
||||
@moduledoc false
|
||||
defstruct [
|
||||
:repos,
|
||||
:needs_hard_sync,
|
||||
:timer,
|
||||
:sync_pid,
|
||||
:from
|
||||
]
|
||||
end
|
||||
|
||||
def init([repo_a, repo_b]) do
|
||||
# Delete any old sync cmds.
|
||||
destroy_all_sync_cmds()
|
||||
|
@ -92,61 +103,95 @@ defmodule Farmbot.Repo do
|
|||
# Copy configs
|
||||
[current, _] = repos
|
||||
:ok = copy_configs(current)
|
||||
{:ok, %{repos: repos, needs_hard_sync: needs_hard_sync, timer: start_timer()}}
|
||||
{:ok, %State{repos: repos, needs_hard_sync: needs_hard_sync, timer: start_timer(), sync_pid: nil}}
|
||||
end
|
||||
|
||||
def terminate(reason, _) do
|
||||
def terminate(reason, state) do
|
||||
if reason not in [:normal, :shutdown] do
|
||||
Logger.error 1, "Repo died: #{inspect reason}"
|
||||
BotState.set_sync_status(:sync_error)
|
||||
end
|
||||
|
||||
if state.from do
|
||||
GenServer.reply(state.from, reason)
|
||||
end
|
||||
Farmbot.FarmEvent.Manager.register_events([])
|
||||
end
|
||||
|
||||
def handle_info({:DOWN, _, :process, _, %State{} = new_state}, %State{} = state) do
|
||||
IO.puts "Sync done"
|
||||
GenServer.reply(state.from, :ok)
|
||||
{:noreply, new_state}
|
||||
end
|
||||
|
||||
def handle_info({:DOWN, _, :process, _, reason}, state) do
|
||||
IO.puts "sync error: #{inspect reason}"
|
||||
GenServer.reply(state.from, reason)
|
||||
BotState.set_sync_status(:sync_now)
|
||||
destroy_all_sync_cmds()
|
||||
{:noreply, %State{state | sync_pid: nil, from: nil}}
|
||||
end
|
||||
|
||||
def handle_info(:timeout, state) do
|
||||
BotState.set_sync_status(:sync_now)
|
||||
destroy_all_sync_cmds()
|
||||
{:noreply, %State{state | timer: start_timer(), needs_hard_sync: true}}
|
||||
end
|
||||
|
||||
def handle_call(:force_hard_sync, _, state) do
|
||||
maybe_cancel_timer(state.timer)
|
||||
BotState.set_sync_status(:sync_now)
|
||||
Farmbot.FarmEvent.Manager.register_events([])
|
||||
{:reply, :ok, %{state | timer: nil, needs_hard_sync: true}}
|
||||
{:reply, :ok, %State{state | timer: nil, needs_hard_sync: true}}
|
||||
end
|
||||
|
||||
def handle_call(:current_repo, _, %{repos: [repo_a, _]} = state) do
|
||||
def handle_call(:current_repo, _, %State{repos: [repo_a, _]} = state) do
|
||||
{:reply, repo_a, state}
|
||||
end
|
||||
|
||||
def handle_call(:other_repo, _, %{repos: [_, repo_b]} = state) do
|
||||
def handle_call(:other_repo, _, %State{repos: [_, repo_b]} = state) do
|
||||
{:reply, repo_b, state}
|
||||
end
|
||||
|
||||
def handle_call(:flip, _, %{repos: [repo_a, repo_b], needs_hard_sync: true} = state) do
|
||||
maybe_cancel_timer(state.timer)
|
||||
destroy_all_sync_cmds()
|
||||
Logger.busy(1, "Syncing.")
|
||||
BotState.set_sync_status(:syncing)
|
||||
do_sync_both(repo_a, repo_b)
|
||||
BotState.set_sync_status(:synced)
|
||||
:ok = copy_configs(repo_b)
|
||||
flip_repos_in_cs()
|
||||
Logger.success(1, "Sync complete.")
|
||||
{:reply, :ok, %{state | repos: [repo_b, repo_a], needs_hard_sync: false, timer: start_timer()}}
|
||||
def handle_call(:flip, from, %State{repos: [repo_a, repo_b], needs_hard_sync: true} = state) do
|
||||
fun = fn() ->
|
||||
maybe_cancel_timer(state.timer)
|
||||
destroy_all_sync_cmds()
|
||||
Logger.busy(1, "Syncing.")
|
||||
BotState.set_sync_status(:syncing)
|
||||
do_sync_both(repo_a, repo_b)
|
||||
BotState.set_sync_status(:synced)
|
||||
:ok = copy_configs(repo_b)
|
||||
flip_repos_in_cs()
|
||||
Logger.success(1, "Sync complete.")
|
||||
exit(%State{state | repos: [repo_b, repo_a], needs_hard_sync: false, timer: start_timer(), sync_pid: nil})
|
||||
end
|
||||
pid = spawn(fun)
|
||||
Process.monitor(pid)
|
||||
{:noreply, %State{sync_pid: pid, from: from}}
|
||||
end
|
||||
|
||||
def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do
|
||||
maybe_cancel_timer(state.timer)
|
||||
Logger.busy(1, "Syncing.")
|
||||
BotState.set_sync_status(:syncing)
|
||||
def handle_call(:flip, from, %State{repos: [repo_a, repo_b]} = state) do
|
||||
fun = fn() ->
|
||||
maybe_cancel_timer(state.timer)
|
||||
Logger.busy(1, "Syncing.")
|
||||
BotState.set_sync_status(:syncing)
|
||||
|
||||
# Fetch all sync_cmds and apply them in order they were received.
|
||||
ConfigStorage.all(SyncCmd)
|
||||
|> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at))
|
||||
|> Enum.each(&apply_sync_cmd(repo_a, &1))
|
||||
# Fetch all sync_cmds and apply them in order they were received.
|
||||
ConfigStorage.all(SyncCmd)
|
||||
|> Enum.sort(&Timex.before?(&1.inserted_at, &2.inserted_at))
|
||||
|> Enum.each(&apply_sync_cmd(repo_a, &1))
|
||||
|
||||
flip_repos_in_cs()
|
||||
BotState.set_sync_status(:synced)
|
||||
:ok = copy_configs(repo_b)
|
||||
destroy_all_sync_cmds()
|
||||
Logger.success(1, "Sync complete.")
|
||||
{:reply, repo_b, %{state | repos: [repo_b, repo_a], timer: start_timer()}}
|
||||
flip_repos_in_cs()
|
||||
BotState.set_sync_status(:synced)
|
||||
:ok = copy_configs(repo_b)
|
||||
destroy_all_sync_cmds()
|
||||
Logger.success(1, "Sync complete.")
|
||||
exit(%State{state | repos: [repo_b, repo_a], timer: start_timer(), sync_pid: nil})
|
||||
end
|
||||
pid = spawn(fun)
|
||||
Process.monitor(pid)
|
||||
{:noreply, %State{sync_pid: pid, from: from}}
|
||||
end
|
||||
|
||||
def handle_call({:register_sync_cmd, remote_id, kind, body}, _from, state) do
|
||||
|
@ -162,21 +207,15 @@ defmodule Farmbot.Repo do
|
|||
false -> :ok = BotState.set_sync_status(:sync_now)
|
||||
true -> :ok = BotState.set_sync_status(:syncing)
|
||||
end
|
||||
{:reply, :ok, %{state | timer: start_timer()}}
|
||||
{:reply, :ok, %State{state | timer: start_timer()}}
|
||||
|
||||
{:error, reason} ->
|
||||
BotState.set_sync_status(:sync_error)
|
||||
Logger.error(1, "Failed to apply sync command: #{inspect(reason)}")
|
||||
{:reply, :error, %{state | needs_hard_sync: true}}
|
||||
{:reply, :error, %State{state | needs_hard_sync: true}}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info(:timeout, state) do
|
||||
BotState.set_sync_status(:sync_now)
|
||||
destroy_all_sync_cmds()
|
||||
{:noreply, %{state | timer: start_timer(), needs_hard_sync: true}}
|
||||
end
|
||||
|
||||
defp copy_configs(repo) do
|
||||
case repo.one(Device) do
|
||||
nil ->
|
||||
|
|
Loading…
Reference in a new issue