Change Preloading and Caching to use the same system as auto_sync

* preloader and reconciler no longer use a transaction
   * This will prevent rolling back a failed sync, but allows farmbot to
   continue operating if a sync does fail
* usage of the preloader updated to reflect this
pull/974/head
Connor Rigby 2019-06-19 11:58:05 -07:00
parent d02194f822
commit e7ea334ab2
No known key found for this signature in database
GPG Key ID: 29A88B24B70456E0
7 changed files with 87 additions and 82 deletions

View File

@ -152,6 +152,11 @@ defmodule FarmbotCore.Asset.Command do
defp as_module!("SensorReading"), do: Asset.SensorReading
defp as_module!("Sequence"), do: Asset.Sequence
defp as_module!("Tool"), do: Asset.Tool
defp as_module!(module) when is_atom(module) do
as_module!(List.last(Module.split(module)))
end
defp as_module!(kind) when is_binary(kind) do
raise("""
Unknown kind: #{kind}

View File

@ -26,7 +26,7 @@ defmodule FarmbotCore.Asset.Regimen do
%{
id: regimen.id,
name: regimen.name,
regimen_items: Enum.map(regimen.items, &Item.render(&1)),
regimen_items: Enum.map(regimen.regimen_items, &Item.render(&1)),
body: Enum.map(regimen.body, &BodyNode.render(&1))
}
end

View File

@ -72,8 +72,13 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
end
def handle_info(:preload, state) do
with :ok <- Preloader.preload_all(),
:ok <- BotState.set_sync_status("synced") do
with :ok <- Preloader.preload_all() do
if Asset.Query.auto_sync?() do
BotState.set_sync_status("synced")
else
BotState.set_sync_status("sync_now")
end
send(self(), :connect)
{:noreply, %{state | preloaded: true}}
else

View File

@ -4,7 +4,7 @@ defmodule FarmbotExt.API.Preloader do
all resources stored in the API.
"""
alias Ecto.{Changeset, Multi}
alias Ecto.Changeset
require FarmbotCore.Logger
alias FarmbotExt.API
@ -12,7 +12,6 @@ defmodule FarmbotExt.API.Preloader do
alias FarmbotCore.{
Asset.Query,
Asset.Repo,
Asset.Sync
}
@ -23,55 +22,44 @@ defmodule FarmbotExt.API.Preloader do
"""
def preload_all() do
with {:ok, sync_changeset} <- API.get_changeset(Sync),
sync <- Changeset.apply_changes(sync_changeset),
multi <- Multi.new(),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
{:ok, _} <- Repo.transaction(multi) do
auto_sync_change =
Enum.find_value(multi.operations, fn {{key, _id}, {:changeset, change, []}} ->
key == :fbos_configs && Changeset.get_change(change, :auto_sync)
end)
FarmbotCore.Logger.success(3, "Successfully synced bootup resources.")
maybe_auto_sync(sync_changeset, auto_sync_change || Query.auto_sync?())
sync_changeset <- Reconciler.sync_group(sync_changeset, SyncGroup.group_0()) do
FarmbotCore.Logger.success(3, "Successfully preloaded resources.")
maybe_auto_sync(sync_changeset, Query.auto_sync?())
end
end
# When auto_sync is enabled, do the full sync.
defp maybe_auto_sync(sync_changeset, true) do
FarmbotCore.Logger.busy(3, "bootup auto sync")
sync = Changeset.apply_changes(sync_changeset)
multi = Multi.new()
defp maybe_auto_sync(%Changeset{} = sync_changeset, true) do
FarmbotCore.Logger.busy(3, "Starting auto sync")
with {:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_1()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_2()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_3()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_4()) do
Multi.insert(multi, :syncs, sync_changeset)
|> Repo.transaction()
FarmbotCore.Logger.success(3, "bootup auto sync complete")
with %Changeset{valid?: true} = sync_changeset <-
Reconciler.sync_group(sync_changeset, SyncGroup.group_1()),
%Changeset{valid?: true} = sync_changeset <-
Reconciler.sync_group(sync_changeset, SyncGroup.group_2()),
%Changeset{valid?: true} = sync_changeset <-
Reconciler.sync_group(sync_changeset, SyncGroup.group_3()),
%Changeset{valid?: true} <- Reconciler.sync_group(sync_changeset, SyncGroup.group_4()) do
FarmbotCore.Logger.success(3, "Auto sync complete")
:ok
else
error ->
FarmbotCore.Logger.error(3, "bootup auto sync failed #{inspect(error)}")
FarmbotCore.Logger.error(3, "Auto sync failed #{inspect(error)}")
error
end
end
# When auto_sync is disabled preload the sync.
defp maybe_auto_sync(sync_changeset, false) do
FarmbotCore.Logger.busy(3, "preloading sync")
defp maybe_auto_sync(%Changeset{} = sync_changeset, false) do
FarmbotCore.Logger.busy(3, "Preloading auto sync")
sync = Changeset.apply_changes(sync_changeset)
case EagerLoader.preload(sync) do
:ok ->
FarmbotCore.Logger.success(3, "preloaded sync ok")
FarmbotCore.Logger.success(3, "Preloaded auto sync complete")
:ok
error ->
FarmbotCore.Logger.error(3, "Failed ot preload sync")
FarmbotCore.Logger.error(3, "Preloading auto sync failed #{inspect(error)}")
error
end
end

View File

@ -3,13 +3,13 @@ defmodule FarmbotExt.API.Reconciler do
Handles remote additions and changes.
"""
require Logger
alias Ecto.{Changeset, Multi}
alias Ecto.Changeset
import Ecto.Query
alias FarmbotExt.API
alias API.{SyncGroup, EagerLoader}
alias FarmbotCore.Asset.{Repo, Sync}
alias FarmbotCore.Asset.{Command, Repo, Sync, Sync.Item}
import FarmbotCore.TimeUtils, only: [compare_datetimes: 2]
@doc """
@ -24,15 +24,16 @@ defmodule FarmbotExt.API.Reconciler do
"""
def sync do
with {:ok, sync_changeset} <- API.get_changeset(Sync),
sync <- Changeset.apply_changes(sync_changeset),
multi <- Multi.new(),
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_0()),
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_1()),
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_2()),
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_3()),
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_4()) do
Multi.insert(multi, :syncs, sync_changeset)
|> Repo.transaction()
%Changeset{valid?: true} = sync_changeset <-
sync_group(sync_changeset, SyncGroup.group_0()),
%Changeset{valid?: true} = sync_changeset <-
sync_group(sync_changeset, SyncGroup.group_1()),
%Changeset{valid?: true} = sync_changeset <-
sync_group(sync_changeset, SyncGroup.group_2()),
%Changeset{valid?: true} = sync_changeset <-
sync_group(sync_changeset, SyncGroup.group_3()),
%Changeset{valid?: true} <- sync_group(sync_changeset, SyncGroup.group_4()) do
:ok
end
end
@ -50,18 +51,21 @@ defmodule FarmbotExt.API.Reconciler do
* applies changeset if there was any changes from cache or http
"""
def sync_group(multi, sync, [module | rest]) do
multi
|> do_sync_group(sync, module)
|> sync_group(sync, rest)
def sync_group(%Changeset{} = sync_changeset, [module | rest]) do
with sync_changeset <- do_sync_group(sync_changeset, module) do
sync_group(sync_changeset, rest)
end
end
def sync_group(multi, _sync, []), do: {:ok, multi}
def sync_group(%Changeset{valid?: true} = ok, []), do: ok
def sync_group(%Changeset{valid?: false} = error, []), do: {:error, error}
defp do_sync_group(multi, sync, module) do
defp do_sync_group(%Changeset{} = sync_changeset, module) when is_atom(module) do
table = module.__schema__(:source) |> String.to_atom()
items = Map.fetch!(sync, table)
# items is a list of changesets
items = Changeset.get_field(sync_changeset, table)
# TODO(Connor) maybe move this into Asset.Query
ids_fbos_knows_about =
Repo.all(from(d in module, where: not is_nil(d.id), select: d.id))
|> Enum.sort()
@ -72,50 +76,55 @@ defmodule FarmbotExt.API.Reconciler do
ids_that_were_deleted = ids_fbos_knows_about -- ids_the_api_knows_about
multi =
Enum.reduce(ids_that_were_deleted, multi, fn id, multi ->
sync_changeset =
Enum.reduce(ids_that_were_deleted, sync_changeset, fn id, sync_changeset ->
Logger.info("delete: #{module} #{inspect(id)}")
local_item = Repo.one!(from(d in module, where: d.id == ^id))
Multi.delete(multi, {table, id}, local_item)
Command.update(module, id, nil)
sync_changeset
end)
# TODO(Connor) make this reduce async with Task/Agent
Enum.reduce(items, multi, &multi_reduce(module, table, &1, &2))
Enum.reduce(items, sync_changeset, &sync_reduce(module, &1, &2))
end
@doc false
def multi_reduce(module, table, item, multi) do
def sync_reduce(module, %Item{} = item, %Changeset{} = sync_changeset) when is_atom(module) do
cached_cs = EagerLoader.get_cache(module, item.id)
local_item = Repo.one(from(d in module, where: d.id == ^item.id))
case get_changeset(local_item || module, item, cached_cs) do
{:insert, %Changeset{} = cs} ->
Logger.info("insert: #{inspect(cs)}")
Multi.insert(multi, {table, item.id}, cs)
item = module.render(Changeset.apply_changes(cs))
:ok = Command.update(module, item.id, item)
sync_changeset
{:update, %Changeset{} = cs} ->
Logger.info("update: #{inspect(cs)}")
Multi.update(multi, {table, item.id}, cs)
item = module.render(Changeset.apply_changes(cs))
:ok = Command.update(module, item.id, item)
sync_changeset
nil ->
Logger.info("Local data: #{local_item.__struct__} is current.")
multi
sync_changeset
end
end
defp get_changeset(local_item, sync_item, cached_cs)
defp get_changeset(local_item, sync_item, cached_changeset)
# A module is passed in if there is no local copy of the data.
defp get_changeset(module, sync_item, nil) when is_atom(module) do
defp get_changeset(module, %Item{} = sync_item, nil) when is_atom(module) do
Logger.info("Local data: #{module} does not exist. Using HTTP to get data.")
{:ok, changeset} = API.get_changeset(module, "#{sync_item.id}")
{:insert, changeset}
end
defp get_changeset(module, sync_item, %Changeset{} = cached) when is_atom(module) do
defp get_changeset(module, %Item{} = sync_item, %Changeset{} = cached) when is_atom(module) do
cached_updated_at = Changeset.get_field(cached, :updated_at)
sync_item_updated_at = sync_item.updated_at
if compare_datetimes(sync_item.updated_at, cached_updated_at) == :eq do
if compare_datetimes(sync_item_updated_at, cached_updated_at) == :eq do
{:insert, cached}
else
Logger.info("Cached item is out of date")
@ -126,14 +135,17 @@ defmodule FarmbotExt.API.Reconciler do
# no cache available
# If the `sync_item.updated_at` is newer than `local_item.updated_at`
# HTTP get the data.
defp get_changeset(%{} = local_item, sync_item, nil) do
defp get_changeset(%{} = local_item, %Item{} = sync_item, nil) do
sync_item_updated_at = sync_item.updated_at
sync_item_id = sync_item.id
# Check if remote data is newer
if compare_datetimes(sync_item.updated_at, local_item.updated_at) == :gt do
if compare_datetimes(sync_item_updated_at, local_item.updated_at) == :gt do
Logger.info(
"Local data: #{local_item.__struct__} is out of date. Using HTTP to get newer data."
)
{:ok, changeset} = API.get_changeset(local_item, "#{sync_item.id}")
{:ok, changeset} = API.get_changeset(local_item, "#{sync_item_id}")
{:update, changeset}
end
end
@ -143,10 +155,11 @@ defmodule FarmbotExt.API.Reconciler do
# If the cache is the same `updated_at` as the API, check if the cache
# is newer than `local_item.updated_at`
# if the cache is not the same `updated_at` as the API, fallback to HTTP.
defp get_changeset(%{} = local_item, sync_item, %Changeset{} = cached) do
defp get_changeset(%{} = local_item, %Item{} = sync_item, %Changeset{} = cached) do
cached_updated_at = Changeset.get_field(cached, :updated_at)
sync_item_updated_at = sync_item.updated_at
if compare_datetimes(sync_item.updated_at, cached_updated_at) == :eq do
if compare_datetimes(sync_item_updated_at, cached_updated_at) == :eq do
if compare_datetimes(cached_updated_at, local_item.updated_at) == :gt do
Logger.info(
"Local data: #{local_item.__struct__} is out of date. Using cache do get newer data."

View File

@ -35,7 +35,7 @@ defmodule AutoSyncChannelTest do
test_pid = self()
expect(Query, :auto_sync?, fn -> false end)
expect(Query, :auto_sync?, 2, fn -> false end)
expect(API, :get_changeset, fn _module ->
send(test_pid, :preload_all_called)

View File

@ -16,7 +16,6 @@ defmodule FarmbotOS.SysCalls do
alias FarmbotCore.{Asset, Asset.Repo, Asset.Private, Asset.Sync, BotState, Leds}
alias FarmbotExt.{API, API.Reconciler, API.SyncGroup}
alias Ecto.{Changeset, Multi}
@behaviour FarmbotCeleryScript.SysCalls
@ -366,17 +365,12 @@ defmodule FarmbotOS.SysCalls do
FarmbotCore.Logger.busy(3, "Syncing")
with {:ok, sync_changeset} <- API.get_changeset(Sync),
sync <- Changeset.apply_changes(sync_changeset),
multi <- Multi.new(),
:ok <- BotState.set_sync_status("syncing"),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_1()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_2()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_3()),
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_4()) do
Multi.insert(multi, :syncs, sync_changeset)
|> Repo.transaction()
sync_changeset <- Reconciler.sync_group(sync_changeset, SyncGroup.group_0()),
sync_changeset <- Reconciler.sync_group(sync_changeset, SyncGroup.group_1()),
sync_changeset <- Reconciler.sync_group(sync_changeset, SyncGroup.group_2()),
sync_changeset <- Reconciler.sync_group(sync_changeset, SyncGroup.group_3()),
_sync_changeset <- Reconciler.sync_group(sync_changeset, SyncGroup.group_4()) do
FarmbotCore.Logger.success(3, "Synced")
:ok = BotState.set_sync_status("synced")
:ok