more work on implementing auto sync
This commit is contained in:
parent
12ce82654f
commit
9b4c72206e
|
@ -67,28 +67,14 @@ defmodule Farmbot.BotState.Transport.GenMQTT.Client do
|
|||
{:ok, state}
|
||||
end
|
||||
|
||||
def on_publish(["bot", _bot, "sync"], msg, state) do
|
||||
sync_cmd = msg |> Poison.decode!()
|
||||
repo = Farmbot.Repo.other_repo()
|
||||
mod = Module.concat(["Farmbot", "Repo", sync_cmd["kind"]])
|
||||
def on_publish(["bot", _, "sync", kind, id], msg, state) do
|
||||
mod = Module.concat(["Farmbot", "Repo", kind])
|
||||
if Code.ensure_loaded?(mod) do
|
||||
Logger.warn "Updating #{sync_cmd["kind"]} => #{sync_cmd["body"]["id"]}"
|
||||
obj = sync_cmd["body"] |> Poison.encode! |> Poison.decode!(as: struct(mod))
|
||||
|
||||
# require IEx; IEx.pry
|
||||
# We need to check if this object exists in the database.
|
||||
case repo.get(mod, obj.id) do
|
||||
# If it does not, just return the newly created object.
|
||||
nil -> obj
|
||||
|
||||
# if there is an existing record, copy the ecto meta from the old
|
||||
# record. This allows `insert_or_update` to work properly.
|
||||
existing -> %{obj | __meta__: existing.__meta__}
|
||||
end
|
||||
|> mod.changeset()
|
||||
|> repo.insert_or_update!()
|
||||
body = struct(mod)
|
||||
sync_cmd = msg |> Poison.decode!(as: struct(Farmbot.Repo.SyncCmd, kind: mod, body: body, id: id))
|
||||
Farmbot.Repo.register_sync_cmd(sync_cmd)
|
||||
else
|
||||
Logger.warn "Unknown module: #{mod} #{inspect sync_cmd["body"]}"
|
||||
Logger.warn "Unknown syncable: #{mod}: #{inspect Poison.decode!(msg)}"
|
||||
end
|
||||
{:ok, state}
|
||||
end
|
||||
|
@ -111,7 +97,7 @@ defmodule Farmbot.BotState.Transport.GenMQTT.Client do
|
|||
|
||||
defp frontend_topic(bot), do: "bot/#{bot}/from_device"
|
||||
defp bot_topic(bot), do: "bot/#{bot}/from_clients"
|
||||
defp sync_topic(bot), do: "bot/#{bot}/sync"
|
||||
defp sync_topic(bot), do: "bot/#{bot}/sync/#"
|
||||
defp status_topic(bot), do: "bot/#{bot}/status"
|
||||
defp log_topic(bot), do: "bot/#{bot}/logs"
|
||||
end
|
||||
|
|
|
@ -35,7 +35,9 @@ defmodule Farmbot.HTTP do
|
|||
|
||||
def request!(method, url, body, headers, opts) do
|
||||
case request(method, url, body, headers, opts) do
|
||||
{:ok, %Response{} = resp} -> resp
|
||||
{:ok, %Response{status_code: code} = resp} when code > 199 and code < 300 -> resp
|
||||
{:ok, %Response{} = resp} -> raise Error, resp
|
||||
|
||||
{:error, reason} -> raise Error, reason
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,6 +2,7 @@ defmodule Farmbot.Repo do
|
|||
@moduledoc "Wrapper between two repos."
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
def current_repo do
|
||||
GenServer.call(__MODULE__, :current_repo)
|
||||
|
@ -37,14 +38,92 @@ defmodule Farmbot.Repo do
|
|||
|
||||
def handle_call(:flip, _, %{repos: [repo_a, repo_b]} = state) do
|
||||
Enum.reverse(state.sync_cmds) |> Enum.map(fn(sync_cmd) ->
|
||||
mod = Module.concat(["Farmbot", "Repo", sync_cmd["kind"]])
|
||||
mod.changeset(struct(mod), sync_cmd["body"]) |> repo_a.insert_or_update!()
|
||||
apply_sync_cmd(repo_a, sync_cmd)
|
||||
end)
|
||||
{:reply, repo_b, %{state | repos: [repo_b, repo_a]}}
|
||||
end
|
||||
|
||||
def handle_call({:register_sync_cmd, sync_cmd}, _from, state) do
|
||||
{:reply, :ok, %{state | sync_cmds: [sync_cmd | state.sync_cmds]}}
|
||||
[_current_repo, other_repo] = state.repos
|
||||
apply_sync_cmd(other_repo, sync_cmd)
|
||||
sync_cmds = if sync_cmd in state.sync_cmds do
|
||||
state.sync_cmds
|
||||
else
|
||||
[sync_cmd | state.sync_cmds]
|
||||
end
|
||||
{:reply, :ok, %{state | sync_cmds: sync_cmds}}
|
||||
end
|
||||
|
||||
defp apply_sync_cmd(repo, sync_cmd) do
|
||||
try do
|
||||
do_apply_sync_cmd(repo, sync_cmd)
|
||||
rescue
|
||||
e in Ecto.InvalidChangesetError ->
|
||||
Logger.error "Failed to apply sync_cmd: (#{repo}) #{inspect sync_cmd} (#{e.action})"
|
||||
fix_repo(repo, sync_cmd)
|
||||
end
|
||||
end
|
||||
|
||||
defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: nil} = sync_cmd) do
|
||||
# an object was deleted.
|
||||
if Code.ensure_loaded?(mod) do
|
||||
Logger.warn "Applying sync_cmd: #{inspect sync_cmd} on #{repo}"
|
||||
case repo.get(mod, id) do
|
||||
nil -> :ok
|
||||
existing ->
|
||||
repo.delete!(existing)
|
||||
:ok
|
||||
end
|
||||
else
|
||||
Logger.warn "Unknown module: #{mod} #{inspect sync_cmd}"
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
defp do_apply_sync_cmd(repo, %{id: id, kind: mod, body: obj} = sync_cmd) do
|
||||
if Code.ensure_loaded?(mod) do
|
||||
Logger.warn "Applying sync_cmd: #{inspect sync_cmd} on #{repo}"
|
||||
|
||||
# We need to check if this object exists in the database.
|
||||
case repo.get(mod, id) do
|
||||
# If it does not, just return the newly created object.
|
||||
nil -> obj
|
||||
|
||||
# if there is an existing record, copy the ecto meta from the old
|
||||
# record. This allows `insert_or_update` to work properly.
|
||||
existing -> %{obj | __meta__: existing.__meta__}
|
||||
end
|
||||
# Build a changeset
|
||||
|> mod.changeset()
|
||||
# Apply it.
|
||||
|> repo.insert_or_update!()
|
||||
else
|
||||
Logger.warn "Unknown module: #{mod} #{inspect sync_cmd}"
|
||||
end
|
||||
end
|
||||
|
||||
defp fix_repo(repo, %{body: nil}) do
|
||||
# The delete already failed. Nothing we can do. This object doesn't exist anymore.
|
||||
:ok
|
||||
end
|
||||
|
||||
defp fix_repo(repo, %{kind: kind, id: id, body: _body}) do
|
||||
# we failed to update with the `body`
|
||||
|
||||
# Fetch a new copy of this object and insert it.
|
||||
obj = kind.fetch(id)
|
||||
case repo.get(kind, id) do
|
||||
# If it does not, just return the newly created object.
|
||||
nil -> obj
|
||||
|
||||
# if there is an existing record, copy the ecto meta from the old
|
||||
# record. This allows `insert_or_update` to work properly.
|
||||
existing -> %{obj | __meta__: existing.__meta__}
|
||||
end
|
||||
# Build a changeset
|
||||
|> kind.changeset()
|
||||
# Apply it.
|
||||
|> repo.insert_or_update!()
|
||||
end
|
||||
|
||||
@doc false
|
||||
|
@ -52,49 +131,6 @@ defmodule Farmbot.Repo do
|
|||
quote do
|
||||
@moduledoc "Storage for Farmbot Resources."
|
||||
use Ecto.Repo, otp_app: :farmbot, adapter: Application.get_env(:farmbot, __MODULE__)[:adapter]
|
||||
|
||||
alias Farmbot.Repo.{
|
||||
FarmEvent,
|
||||
GenericPointer,
|
||||
Peripheral,
|
||||
Point,
|
||||
Regimen,
|
||||
Sequence,
|
||||
ToolSlot,
|
||||
Tool
|
||||
}
|
||||
|
||||
@default_syncables [
|
||||
FarmEvent,
|
||||
GenericPointer,
|
||||
Peripheral,
|
||||
Point,
|
||||
Regimen,
|
||||
Sequence,
|
||||
ToolSlot,
|
||||
Tool
|
||||
]
|
||||
|
||||
@doc "A list of all the resources."
|
||||
def syncables,
|
||||
do: Application.get_env(:farmbot, :repo)[:farmbot_syncables] || @default_syncables
|
||||
|
||||
@doc "Sync all the modules that export a `sync/1` function."
|
||||
def sync!(http \\ Farmbot.HTTP) do
|
||||
for syncable <- syncables() do
|
||||
if Code.ensure_loaded?(syncable) and function_exported?(syncable, :sync!, 2) do
|
||||
spawn(fn ->
|
||||
syncable.sync!(__MODULE__, http)
|
||||
end)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -21,7 +21,7 @@ defmodule Farmbot.Repo.Sequence do
|
|||
|
||||
res =
|
||||
Enum.map(data, fn data ->
|
||||
Farmbot.CeleryScript.Ast.parse(data)
|
||||
Farmbot.CeleryScript.AST.parse(data)
|
||||
end)
|
||||
|
||||
{:ok, res}
|
||||
|
@ -39,7 +39,7 @@ defmodule Farmbot.Repo.Sequence do
|
|||
|
||||
def load(text) do
|
||||
{:ok, data} = text |> JSONType.load()
|
||||
res = Farmbot.CeleryScript.Ast.parse_args(data)
|
||||
res = Farmbot.CeleryScript.AST.parse_args(data)
|
||||
{:ok, res}
|
||||
end
|
||||
end
|
||||
|
|
3
lib/farmbot/repo/sync_cmd.ex
Normal file
3
lib/farmbot/repo/sync_cmd.ex
Normal file
|
@ -0,0 +1,3 @@
|
|||
defmodule Farmbot.Repo.SyncCmd do
|
||||
defstruct [:id, :kind, :body]
|
||||
end
|
|
@ -1,10 +1,6 @@
|
|||
defmodule Farmbot.Repo.Syncable do
|
||||
@moduledoc "Behaviour for syncable modules."
|
||||
|
||||
@doc "Sync this module."
|
||||
@callback sync!(module, GenServer.server()) :: any | no_return
|
||||
@optional_callbacks [sync!: 2]
|
||||
|
||||
@doc "Changes iso8601 times to DateTime structs."
|
||||
def ensure_time(struct, []), do: struct
|
||||
|
||||
|
@ -20,52 +16,15 @@ defmodule Farmbot.Repo.Syncable do
|
|||
enable_sync = Keyword.get(opts, :sync, true)
|
||||
|
||||
quote do
|
||||
@behaviour Farmbot.Repo.Syncable
|
||||
import Farmbot.Repo.Syncable, only: [ensure_time: 2]
|
||||
require Logger
|
||||
|
||||
if unquote(enable_sync) do
|
||||
@doc """
|
||||
Syncs all #{__MODULE__ |> Module.split() |> List.last()}'s from the Farmbot Web App.
|
||||
1) Fetches JSON from the API.
|
||||
2) Parses JSON as a list of #{__MODULE__ |> Module.split() |> List.last()}'s.
|
||||
3) For each record in the list, checks if the item exists already,
|
||||
4) Inserts or Updates each item in the list into the Repo.
|
||||
"""
|
||||
def sync!(repo, http) do
|
||||
{_, source} = struct(__MODULE__).__meta__.source
|
||||
color = Farmbot.DebugLog.color(:RANDOM)
|
||||
Logger.info("#{color}[#{source}] Begin sync.")
|
||||
# |> fn(bin) -> IO.inspect(Poison.decode!(bin)); bin end.()
|
||||
# |> fn(obj) -> IO.inspect(obj); obj end.()
|
||||
http
|
||||
|> Farmbot.HTTP.get!("/api/#{source}")
|
||||
|> Map.fetch!(:body)
|
||||
|> Poison.decode!(as: [%__MODULE__{}])
|
||||
|> Enum.each(fn obj ->
|
||||
# We need to check if this object exists in the database.
|
||||
case repo.get(__MODULE__, obj.id) do
|
||||
# If it does not, just return the newly created object.
|
||||
nil ->
|
||||
obj
|
||||
|
||||
# if there is an existing record, copy the ecto meta from the old
|
||||
# record. This allows `insert_or_update` to work properly.
|
||||
existing ->
|
||||
%{obj | __meta__: existing.__meta__}
|
||||
end
|
||||
|> __MODULE__.changeset()
|
||||
|> repo.insert_or_update!()
|
||||
end)
|
||||
|
||||
Logger.info("#{color}[#{source}] Complete sync.")
|
||||
end
|
||||
def fetch(id) do
|
||||
{_, plural} = struct(__MODULE__).__meta__ |> Map.get(:source)
|
||||
Farmbot.HTTP.get!("/api/#{plural}/#{id}").body |> Poison.decode!(as: struct(__MODULE__))
|
||||
end
|
||||
|
||||
@doc "Fetch all #{__MODULE__}'s from the Repo."
|
||||
def fetch_all(repo) do
|
||||
import Ecto.Query
|
||||
from(i in __MODULE__, select: i) |> repo.all()
|
||||
if unquote(enable_sync) do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
Enum.each(struct(Farmbot.CeleryScript.VirtualMachine.InstructionSet) |> Map.from_struct(), fn({snake, camel}) ->
|
||||
camel = Module.split(camel)
|
||||
camel = Enum.join(camel, ".")
|
||||
res = "#{:code.priv_dir(:farmbot)}/instruction.ex.eex" |> EEx.eval_file(camel_instruction: camel, snake_instruction: snake)
|
||||
File.write!("lib/farmbot/celery_script/virtual_machine/instruction/#{snake}.ex", res)
|
||||
end
|
||||
)
|
Loading…
Reference in a new issue