If stale records are found, abort, wait, retry
parent
7431ecf800
commit
8309412f49
|
@ -16,39 +16,24 @@ defmodule FarmbotCore.Asset.Private do
|
|||
|
||||
@doc "Lists `module` objects that still need to be POSTed to the API."
|
||||
def list_local(module) do
|
||||
list = Repo.all(from(data in module, where: is_nil(data.id)))
|
||||
Enum.map(list, fn item ->
|
||||
if module == FarmbotCore.Asset.Point do
|
||||
msg = "list_local: Point#{item.id}.y = #{item.y || "nil"}"
|
||||
FarmbotCore.Logger.info(3, msg)
|
||||
end
|
||||
item
|
||||
end)
|
||||
Repo.all(from(data in module, where: is_nil(data.id)))
|
||||
end
|
||||
|
||||
@doc "Lists `module` objects that have a `local_meta` object"
|
||||
def list_dirty(module) do
|
||||
table = table(module)
|
||||
q = from(lm in LocalMeta, where: lm.table == ^table, select: lm.asset_local_id)
|
||||
list = Repo.all(from(data in module, join: lm in subquery(q)))
|
||||
Enum.map(list, fn item ->
|
||||
if module == FarmbotCore.Asset.Point do
|
||||
msg = "list_dirty: Point#{item.id}.y = #{item.y || "nil"}"
|
||||
FarmbotCore.Logger.info(3, msg)
|
||||
end
|
||||
item
|
||||
end)
|
||||
end
|
||||
|
||||
def maybe_get_local_meta(asset, table) do
|
||||
Repo.one(from(lm in LocalMeta, where: lm.asset_local_id == ^asset.local_id and lm.table == ^table))
|
||||
Repo.all(from(data in module, join: lm in subquery(q)))
|
||||
end
|
||||
|
||||
@doc "Mark a document as `dirty` by creating a `local_meta` object"
|
||||
def mark_dirty!(asset, params \\ %{}) do
|
||||
table = table(asset)
|
||||
|
||||
local_meta = maybe_get_local_meta(asset, table) || Ecto.build_assoc(asset, :local_meta)
|
||||
local_meta =
|
||||
Repo.one(
|
||||
from(lm in LocalMeta, where: lm.asset_local_id == ^asset.local_id and lm.table == ^table)
|
||||
) || Ecto.build_assoc(asset, :local_meta)
|
||||
|
||||
## NOTE(Connor): 19/11/13
|
||||
# the try/catch here seems unneeded here, but because of how sqlite/ecto works, it is 100% needed.
|
||||
|
|
|
@ -42,27 +42,30 @@ defmodule FarmbotExt.API.DirtyWorker do
|
|||
|
||||
@impl GenServer
|
||||
def handle_info(:do_work, %{module: module} = state) do
|
||||
(Private.list_dirty(module) ++ Private.list_local(module))
|
||||
|> Enum.map(fn item ->
|
||||
Process.sleep(1000)
|
||||
list = Enum.uniq((Private.list_dirty(module) ++ Private.list_local(module)))
|
||||
|
||||
stale = Enum.find(list, fn item ->
|
||||
if (item.id) do
|
||||
Process.sleep(300)
|
||||
next_item = Repo.get_by(module, id: item.id)
|
||||
if module == FarmbotCore.Asset.Point do
|
||||
old_y = item.y
|
||||
new_y = next_item.y
|
||||
msg = "Y value after DB refresh: #{old_y} => #{new_y}"
|
||||
FarmbotCore.Logger.info(2, msg)
|
||||
if item == Repo.get_by(module, id: item.id) do
|
||||
false
|
||||
else
|
||||
IO.inspect(item, label: "=== STALE RECORD DETECTED!")
|
||||
true
|
||||
end
|
||||
next_item
|
||||
else
|
||||
item
|
||||
false
|
||||
end
|
||||
end)
|
||||
|> Enum.uniq()
|
||||
|> Enum.map(fn dirty -> work(dirty, module) end)
|
||||
|
||||
Process.send_after(self(), :do_work, @timeout)
|
||||
{:noreply, state}
|
||||
if stale do
|
||||
Process.send_after(self(), :do_work, @timeout)
|
||||
{:noreply, state}
|
||||
else
|
||||
Enum.map(list, fn dirty -> work(dirty, module) end)
|
||||
Process.send_after(self(), :do_work, @timeout)
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
def work(dirty, module) do
|
||||
|
|
Loading…
Reference in New Issue