Partial re-write of DirtyWOrker module
parent
13fa880204
commit
e1f79aeaf5
|
@ -7,7 +7,7 @@ defmodule FarmbotCeleryScript.Compiler.UpdateResource do
|
|||
variable = unquote(Map.fetch!(args, :resource))
|
||||
update = unquote(unpair(body, %{}))
|
||||
# Go easy on the API...
|
||||
Process.sleep(1234)
|
||||
Process.sleep(250)
|
||||
case variable do
|
||||
%AST{kind: :identifier} ->
|
||||
args = Map.fetch!(variable, :args)
|
||||
|
|
|
@ -7,7 +7,7 @@ defmodule FarmbotExt.API.DirtyWorker do
|
|||
|
||||
require Logger
|
||||
use GenServer
|
||||
@timeout 4700
|
||||
@timeout 1500
|
||||
|
||||
# these resources can't be accessed by `id`.
|
||||
@singular [
|
||||
|
@ -35,29 +35,25 @@ defmodule FarmbotExt.API.DirtyWorker do
|
|||
@impl GenServer
|
||||
def init(args) do
|
||||
module = Keyword.fetch!(args, :module)
|
||||
timeout = Keyword.get(args, :timeout, @timeout)
|
||||
timer = Process.send_after(self(), :timeout, timeout)
|
||||
{:ok, %{module: module, timeout: timeout, timer: timer}}
|
||||
Process.send_after(self(), :do_work, 100)
|
||||
{:ok, %{module: module}}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info(:timeout, %{module: module} = state) do
|
||||
dirty = Private.list_dirty(module)
|
||||
local = Private.list_local(module)
|
||||
{:noreply, state, {:continue, Enum.uniq(dirty ++ local)}}
|
||||
def handle_info(:do_work, %{module: module} = state) do
|
||||
(Private.list_dirty(module) ++ Private.list_local(module))
|
||||
|> Enum.uniq()
|
||||
|> Enum.map(fn dirty -> work(dirty, module) end)
|
||||
|
||||
Process.send_after(self(), :do_work, @timeout)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_continue([], state) do
|
||||
timer = Process.send_after(self(), :timeout, state.timeout)
|
||||
{:noreply, %{state | timer: timer}}
|
||||
end
|
||||
|
||||
def handle_continue([dirty | rest], %{module: module} = state) do
|
||||
case http_request(dirty, state) do
|
||||
def work(dirty, module) do
|
||||
case http_request(dirty, module) do
|
||||
# Valid data
|
||||
{:ok, %{status: s, body: body}} when s > 199 and s < 300 ->
|
||||
dirty |> module.changeset(body) |> handle_changeset(rest, state)
|
||||
dirty |> module.changeset(body) |> handle_changeset(module)
|
||||
|
||||
# Invalid data
|
||||
{:ok, %{status: s, body: %{} = body}} when s > 399 and s < 500 ->
|
||||
|
@ -66,35 +62,33 @@ defmodule FarmbotExt.API.DirtyWorker do
|
|||
Enum.reduce(body, changeset, fn {key, val}, changeset ->
|
||||
Ecto.Changeset.add_error(changeset, key, val)
|
||||
end)
|
||||
|> handle_changeset(rest, state)
|
||||
|> handle_changeset(module)
|
||||
|
||||
# Invalid data, but the API didn't say why
|
||||
{:ok, %{status: s, body: _body}} when s > 399 and s < 500 ->
|
||||
module.changeset(dirty)
|
||||
|> Map.put(:valid?, false)
|
||||
|> handle_changeset(rest, state)
|
||||
|> handle_changeset(module)
|
||||
|
||||
# HTTP Error. (500, network error, timeout etc.)
|
||||
error ->
|
||||
Logger.error(
|
||||
"[#{module} #{dirty.local_id} #{inspect(self())}] HTTP Error: #{state.module} #{
|
||||
"[#{module} #{dirty.local_id} #{inspect(self())}] HTTP Error: #{module} #{
|
||||
inspect(error)
|
||||
}"
|
||||
)
|
||||
|
||||
{:noreply, state, @timeout}
|
||||
module
|
||||
end
|
||||
end
|
||||
|
||||
# If the changeset was valid, update the record.
|
||||
def handle_changeset(%{valid?: true} = changeset, rest, state) do
|
||||
Repo.update!(changeset)
|
||||
|> Private.mark_clean!()
|
||||
|
||||
{:noreply, state, {:continue, rest}}
|
||||
def handle_changeset(%{valid?: true} = changeset, _module) do
|
||||
Private.mark_clean!(Repo.update!(changeset))
|
||||
:ok
|
||||
end
|
||||
|
||||
def handle_changeset(%{valid?: false, data: data} = changeset, rest, state) do
|
||||
def handle_changeset(%{valid?: false, data: data} = changeset, module) do
|
||||
message =
|
||||
Enum.map(changeset.errors, fn
|
||||
{key, {msg, _meta}} when is_binary(key) -> "\t#{key}: #{msg}"
|
||||
|
@ -102,26 +96,26 @@ defmodule FarmbotExt.API.DirtyWorker do
|
|||
end)
|
||||
|> Enum.join("\n")
|
||||
|
||||
Logger.error("Failed to sync: #{state.module} \n #{message}")
|
||||
Logger.error("Failed to sync: #{module} \n #{message}")
|
||||
_ = Repo.delete!(data)
|
||||
{:noreply, state, {:continue, rest}}
|
||||
:ok
|
||||
end
|
||||
|
||||
defp http_request(%{id: nil} = dirty, state) do
|
||||
path = state.module.path()
|
||||
data = render(state.module, dirty)
|
||||
defp http_request(%{id: nil} = dirty, module) do
|
||||
path = module.path()
|
||||
data = render(module, dirty)
|
||||
API.post(API.client(), path, data)
|
||||
end
|
||||
|
||||
defp http_request(dirty, %{module: module} = state) when module in @singular do
|
||||
path = path = state.module.path()
|
||||
data = render(state.module, dirty)
|
||||
defp http_request(dirty, module) when module in @singular do
|
||||
path = path = module.path()
|
||||
data = render(module, dirty)
|
||||
API.patch(API.client(), path, data)
|
||||
end
|
||||
|
||||
defp http_request(dirty, state) do
|
||||
path = Path.join(state.module.path(), to_string(dirty.id))
|
||||
data = render(state.module, dirty)
|
||||
defp http_request(dirty, module) do
|
||||
path = Path.join(module.path(), to_string(dirty.id))
|
||||
data = render(module, dirty)
|
||||
API.patch(API.client(), path, data)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue