Implement error handling for api 429 errors
* Changes auto_sync_channel to no longer use `handle_continue` * If preloading fails, retry instead of crashing. * Make `get_changeset` return an error rather than raising an exception * Update every use of that function to handle posibility of errorpull/974/head
parent
946e4e07da
commit
9a16c79ee8
|
@ -30,6 +30,7 @@ defimpl FarmbotCore.AssetWorker, for: FarmbotCore.Asset.RegimenInstance do
|
|||
end
|
||||
|
||||
def init([regimen_instance, args]) do
|
||||
Logger.warn "RegimenInstance #{inspect(regimen_instance)} initializing"
|
||||
apply_sequence = Keyword.get(args, :apply_sequence, &apply_sequence/2)
|
||||
unless is_function(apply_sequence, 2) do
|
||||
raise "RegimenInstance Sequence handler should be a 2 arity function"
|
||||
|
|
|
@ -90,7 +90,7 @@ defmodule FarmbotCore.BotState do
|
|||
|
||||
@doc "Sets informational_settings.status"
|
||||
def set_sync_status(bot_state_server \\ __MODULE__, s)
|
||||
when s in ["sync_now", "syncing", "synced", "error"] do
|
||||
when s in ["sync_now", "syncing", "synced", "sync_error"] do
|
||||
GenServer.call(bot_state_server, {:set_sync_status, s})
|
||||
end
|
||||
|
||||
|
|
|
@ -39,7 +39,8 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
|
|||
def init(args) do
|
||||
Process.flag(:sensitive, true)
|
||||
jwt = Keyword.fetch!(args, :jwt)
|
||||
{:ok, %State{conn: nil, chan: nil, jwt: jwt, preloaded: false}, {:continue, :preload}}
|
||||
send(self(), :preload)
|
||||
{:ok, %State{conn: nil, chan: nil, jwt: jwt, preloaded: false}}
|
||||
end
|
||||
|
||||
def terminate(reason, state) do
|
||||
|
@ -48,14 +49,21 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
|
|||
if state.chan, do: ConnectionWorker.close_channel(state.chan)
|
||||
end
|
||||
|
||||
def handle_continue(:preload, state) do
|
||||
:ok = Preloader.preload_all()
|
||||
next_state = %{state | preloaded: true}
|
||||
:ok = BotState.set_sync_status("synced")
|
||||
{:noreply, next_state, {:continue, :connect}}
|
||||
def handle_info(:preload, state) do
|
||||
with :ok <- Preloader.preload_all(),
|
||||
:ok <- BotState.set_sync_status("synced") do
|
||||
send(self(), :connect)
|
||||
{:noreply, %{state | preloaded: true}}
|
||||
else
|
||||
{:error, reason} ->
|
||||
BotState.set_sync_status("sync_error")
|
||||
Logger.error("Error preloading. #{reason}")
|
||||
Process.send_after(self(), :preload, 5000)
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_continue(:connect, state) do
|
||||
def handle_info(:connect, state) do
|
||||
result = ConnectionWorker.maybe_connect_autosync(state.jwt.bot)
|
||||
compute_reply_from_amqp_state(state, result)
|
||||
end
|
||||
|
@ -76,7 +84,7 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:basic_deliver, payload, %{routing_key: key}}, state) do
|
||||
def handle_info({:basic_deliver, payload, %{routing_key: key}}, %{preloaded: true} = state) do
|
||||
# Logger.warn "AUTOSYNC PAYLOAD: #{inspect(key)} #{inspect(payload)}"
|
||||
chan = state.chan
|
||||
data = JSON.decode!(payload)
|
||||
|
@ -97,6 +105,11 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:basic_deliver, _, _}, %{preloaded: false} = state) do
|
||||
send(self(), :preload)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_call(:network_status, _, state) do
|
||||
reply = %{conn: state.conn, chan: state.chan, preloaded: state.preloaded}
|
||||
|
||||
|
|
|
@ -61,10 +61,8 @@ defmodule FarmbotExt.API do
|
|||
@progress_steps 50
|
||||
|
||||
def upload_image(image_filename, meta \\ %{}) do
|
||||
storage_auth =
|
||||
%StorageAuth{form_data: form_data} =
|
||||
API.get_changeset(StorageAuth)
|
||||
|> Ecto.Changeset.apply_changes()
|
||||
{:ok, changeset} = API.get_changeset(StorageAuth)
|
||||
storage_auth = %StorageAuth{form_data: form_data} = Ecto.Changeset.apply_changes(changeset)
|
||||
|
||||
content_length = :filelib.file_size(image_filename)
|
||||
{:ok, pid} = Agent.start_link(fn -> 0 end)
|
||||
|
@ -150,8 +148,20 @@ defmodule FarmbotExt.API do
|
|||
|
||||
@doc "helper for `GET`ing a path."
|
||||
def get_body!(path) do
|
||||
API.get!(API.client(), path)
|
||||
|> Map.fetch!(:body)
|
||||
case API.get!(API.client(), path) do
|
||||
%{body: body, status: 200} ->
|
||||
{:ok, body}
|
||||
|
||||
%{body: error, status: status} ->
|
||||
msg = """
|
||||
HTTP Error getting: #{path}
|
||||
Status Code = #{status}
|
||||
|
||||
#{error}
|
||||
"""
|
||||
|
||||
{:error, msg}
|
||||
end
|
||||
end
|
||||
|
||||
@doc "helper for `GET`ing api resources."
|
||||
|
@ -162,11 +172,14 @@ defmodule FarmbotExt.API do
|
|||
def get_changeset(%module{} = data) do
|
||||
get_body!(module.path())
|
||||
|> case do
|
||||
%{} = single ->
|
||||
module.changeset(data, single)
|
||||
{:ok, %{} = single} ->
|
||||
{:ok, module.changeset(data, single)}
|
||||
|
||||
many when is_list(many) ->
|
||||
Enum.map(many, &module.changeset(data, &1))
|
||||
{:ok, many} when is_list(many) ->
|
||||
{:ok, Enum.map(many, &module.changeset(data, &1))}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -193,11 +206,14 @@ defmodule FarmbotExt.API do
|
|||
def get_changeset(%module{} = data, path) do
|
||||
get_body!(Path.join(module.path(), to_string(path)))
|
||||
|> case do
|
||||
%{} = single ->
|
||||
module.changeset(data, single)
|
||||
{:ok, %{} = single} ->
|
||||
{:ok, module.changeset(data, single)}
|
||||
|
||||
many when is_list(many) ->
|
||||
Enum.map(many, &module.changeset(data, &1))
|
||||
{:ok, many} when is_list(many) ->
|
||||
{:ok, Enum.map(many, &module.changeset(data, &1))}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -30,7 +30,8 @@ defmodule FarmbotExt.API.EagerLoader do
|
|||
|
||||
def preload(asset_module, %{id: id}) when is_atom(asset_module) do
|
||||
local = Repo.one(from(m in asset_module, where: m.id == ^id)) || asset_module
|
||||
:ok = API.get_changeset(local, id) |> cache()
|
||||
{:ok, changeset} = API.get_changeset(local, id)
|
||||
:ok = cache(changeset)
|
||||
end
|
||||
|
||||
@doc "Get a Changeset by module and id. May return nil"
|
||||
|
|
|
@ -25,12 +25,10 @@ defmodule FarmbotExt.API.Preloader do
|
|||
"""
|
||||
@callback preload_all :: :ok | :error
|
||||
def preload_all() do
|
||||
sync_changeset = API.get_changeset(Sync)
|
||||
sync = Changeset.apply_changes(sync_changeset)
|
||||
|
||||
multi = Multi.new()
|
||||
|
||||
with {:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
|
||||
with {:ok, sync_changeset} <- API.get_changeset(Sync),
|
||||
sync <- Changeset.apply_changes(sync_changeset),
|
||||
multi <- Multi.new(),
|
||||
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
|
||||
{:ok, _} <- Repo.transaction(multi) do
|
||||
auto_sync_change =
|
||||
Enum.find_value(multi.operations, fn {{key, _id}, {:changeset, change, []}} ->
|
||||
|
|
|
@ -19,12 +19,10 @@ defmodule FarmbotExt.API.Preloader.HTTP do
|
|||
actually sync all resources. If it is not, preload all resources.
|
||||
"""
|
||||
def preload_all() do
|
||||
sync_changeset = API.get_changeset(Sync)
|
||||
sync = Changeset.apply_changes(sync_changeset)
|
||||
|
||||
multi = Multi.new()
|
||||
|
||||
with {:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
|
||||
with {:ok, sync_changeset} <- API.get_changeset(Sync),
|
||||
sync <- Changeset.apply_changes(sync_changeset),
|
||||
multi <- Multi.new(),
|
||||
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
|
||||
{:ok, _} <- Repo.transaction(multi) do
|
||||
auto_sync_change =
|
||||
Enum.find_value(multi.operations, fn {{key, _id}, {:changeset, change, []}} ->
|
||||
|
|
|
@ -23,13 +23,10 @@ defmodule FarmbotExt.API.Reconciler do
|
|||
* apply the Transaction.
|
||||
"""
|
||||
def sync do
|
||||
# Get the sync changeset
|
||||
sync_changeset = API.get_changeset(Sync)
|
||||
sync = Changeset.apply_changes(sync_changeset)
|
||||
|
||||
multi = Multi.new()
|
||||
|
||||
with {:ok, multi} <- sync_group(multi, sync, SyncGroup.group_0()),
|
||||
with {:ok, sync_changeset} <- API.get_changeset(Sync),
|
||||
sync <- Changeset.apply_changes(sync_changeset),
|
||||
multi <- Multi.new(),
|
||||
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_0()),
|
||||
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_1()),
|
||||
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_2()),
|
||||
{:ok, multi} <- sync_group(multi, sync, SyncGroup.group_3()),
|
||||
|
@ -111,7 +108,8 @@ defmodule FarmbotExt.API.Reconciler do
|
|||
# A module is passed in if there is no local copy of the data.
|
||||
defp get_changeset(module, sync_item, nil) when is_atom(module) do
|
||||
Logger.info("Local data: #{module} does not exist. Using HTTP to get data.")
|
||||
{:insert, API.get_changeset(module, "#{sync_item.id}")}
|
||||
{:ok, changeset} = API.get_changeset(module, "#{sync_item.id}")
|
||||
{:insert, changeset}
|
||||
end
|
||||
|
||||
defp get_changeset(module, sync_item, %Changeset{} = cached) when is_atom(module) do
|
||||
|
@ -135,7 +133,8 @@ defmodule FarmbotExt.API.Reconciler do
|
|||
"Local data: #{local_item.__struct__} is out of date. Using HTTP to get newer data."
|
||||
)
|
||||
|
||||
{:update, API.get_changeset(local_item, "#{sync_item.id}")}
|
||||
{:ok, changeset} = API.get_changeset(local_item, "#{sync_item.id}")
|
||||
{:update, changeset}
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -248,13 +248,12 @@ defmodule FarmbotOS.SysCalls do
|
|||
|
||||
def sync() do
|
||||
FarmbotCore.Logger.busy(3, "Syncing")
|
||||
sync_changeset = API.get_changeset(Sync)
|
||||
sync = Changeset.apply_changes(sync_changeset)
|
||||
multi = Multi.new()
|
||||
|
||||
:ok = BotState.set_sync_status("syncing")
|
||||
|
||||
with {:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
|
||||
with {:ok, sync_changeset} <- API.get_changeset(Sync),
|
||||
sync <- Changeset.apply_changes(sync_changeset),
|
||||
multi <- Multi.new(),
|
||||
:ok <- BotState.set_sync_status("syncing"),
|
||||
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_0()),
|
||||
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_1()),
|
||||
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_2()),
|
||||
{:ok, multi} <- Reconciler.sync_group(multi, sync, SyncGroup.group_3()),
|
||||
|
|
Loading…
Reference in New Issue