[UNSTABLE] Sequence context
This commit is contained in:
parent
65442f2060
commit
35c2f3d0c7
17
lib/farmbot/celery_script/commands/call_parent.ex
Normal file
17
lib/farmbot/celery_script/commands/call_parent.ex
Normal file
|
@ -0,0 +1,17 @@
|
|||
defmodule Farmbot.CeleryScript.Command.CallParent do
|
||||
@moduledoc """
|
||||
CallParent
|
||||
"""
|
||||
|
||||
alias Farmbot.CeleryScript.Command
|
||||
alias Farmbot.CeleryScript.Ast
|
||||
|
||||
@behaviour Command
|
||||
|
||||
@doc ~s"""
|
||||
"""
|
||||
@spec run(%{context: map}, []) :: no_return
|
||||
def run(%{context: context}, []) do
|
||||
IO.warn "HEY!!! #{inspect context}"
|
||||
end
|
||||
end
|
|
@ -14,10 +14,22 @@ defmodule Farmbot.CeleryScript.Command.Execute do
|
|||
body: []
|
||||
"""
|
||||
@spec run(%{sequence_id: integer}, []) :: no_return
|
||||
def run(%{sequence_id: id}, []) do
|
||||
def run(%{sequence_id: id} = args, []) do
|
||||
id
|
||||
|> Farmbot.Sync.get_sequence
|
||||
|> Ast.parse
|
||||
|> merge_args(args)
|
||||
|> delete_me()
|
||||
|> Command.do_command
|
||||
end
|
||||
|
||||
defp merge_args(ast, args), do: %{ast | args: Map.merge(ast.args, args)}
|
||||
|
||||
defp delete_me(ast) do
|
||||
%{ast | body: [blerp() | ast.body]}
|
||||
end
|
||||
|
||||
defp blerp do
|
||||
%Farmbot.CeleryScript.Ast{args: %{}, body: [], comment: nil, kind: "call_parent"}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,6 +5,7 @@ defmodule Farmbot.CeleryScript.Command.Sequence do
|
|||
|
||||
alias Farmbot.CeleryScript.Command
|
||||
alias Farmbot.CeleryScript.Ast
|
||||
require Logger
|
||||
|
||||
@behaviour Command
|
||||
|
||||
|
@ -17,7 +18,8 @@ defmodule Farmbot.CeleryScript.Command.Sequence do
|
|||
def run(args, body) do
|
||||
# rebuild the ast node
|
||||
ast = %Ast{kind: "sequence", args: args, body: body}
|
||||
{:ok, _pid} = Farmbot.SequenceRunner.start_link(ast)
|
||||
# Elixir.Sequence.Supervisor.add_child(ast, Timex.now())
|
||||
# Logger.debug "Starting sequence: #{inspect ast}"
|
||||
{:ok, pid} = Farmbot.SequenceRunner.start_link(ast)
|
||||
Farmbot.SequenceRunner.wait(pid)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,9 +7,8 @@ defmodule Farmbot.FarmEvent.Supervisor do
|
|||
|
||||
def init(_) do
|
||||
children = [
|
||||
supervisor(Sequence.Supervisor, [], [restart: :permanent]),
|
||||
worker(Regimen.Supervisor, [], [restart: :permanent]),
|
||||
worker(FarmEventRunner, [], [restart: :permanent])
|
||||
worker(Farmbot.Regimen.Supervisor, [], [restart: :permanent]),
|
||||
worker(Farmbot.FarmEventRunner, [], [restart: :permanent])
|
||||
]
|
||||
opts = [strategy: :one_for_one]
|
||||
supervise(children, opts)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
defmodule FarmEventRunner do
|
||||
defmodule Farmbot.FarmEventRunner do
|
||||
@moduledoc """
|
||||
Checks the database every 60 seconds for FarmEvents
|
||||
"""
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
defmodule Regimen.Supervisor do
|
||||
defmodule Farmbot.Regimen.Supervisor do
|
||||
@moduledoc """
|
||||
Supervisor for Regimens
|
||||
"""
|
||||
|
|
|
@ -18,6 +18,7 @@ defmodule Farmbot.RegimenRunner do
|
|||
use GenServer
|
||||
use Amnesia
|
||||
use Farmbot.Sync.Database
|
||||
alias Farmbot.Regimen.Supervisor, as: RegSup
|
||||
require Logger
|
||||
|
||||
def start_link(regimen, time) do
|
||||
|
@ -53,7 +54,7 @@ defmodule Farmbot.RegimenRunner do
|
|||
def handle_info(:execute, state) do
|
||||
{item, regimen} = pop_item(state.regimen)
|
||||
if item do
|
||||
Elixir.Sequence.Supervisor.add_child(item.sequence, Timex.now())
|
||||
Farmbot.CeleryScript.Command.do_command(item)
|
||||
next_item = List.first(regimen.regimen_items)
|
||||
if next_item do
|
||||
next_dt = Timex.shift(state.epoch, milliseconds: next_item.time_offset)
|
||||
|
@ -65,14 +66,14 @@ defmodule Farmbot.RegimenRunner do
|
|||
else
|
||||
Logger.info ">> #{regimen.name} is complete!"
|
||||
spawn fn() ->
|
||||
Elixir.Regimen.Supervisor.remove_child(regimen)
|
||||
RegSup.remove_child(regimen)
|
||||
end
|
||||
{:noreply, :finished}
|
||||
end
|
||||
else
|
||||
Logger.info ">> #{regimen.name} is complete!"
|
||||
spawn fn() ->
|
||||
Elixir.Regimen.Supervisor.remove_child(regimen)
|
||||
RegSup.remove_child(regimen)
|
||||
end
|
||||
{:noreply, :finished}
|
||||
end
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
defmodule Sequence.Supervisor do
|
||||
@moduledoc """
|
||||
Supervisor for Sequences
|
||||
"""
|
||||
@behaviour Farmbot.EventSupervisor
|
||||
use GenServer
|
||||
use Farmbot.Sync.Database
|
||||
alias Farmbot.SequenceRunner
|
||||
require Logger
|
||||
|
||||
@type state :: %{
|
||||
# Queue of sequences to run
|
||||
q: :queue.queue,
|
||||
# list of pids waiting for the current sequence to finish
|
||||
blocks: [pid],
|
||||
# the running sequence
|
||||
running: {pid, Sequence.t} | nil
|
||||
}
|
||||
|
||||
@doc """
|
||||
Starts the Sequence Supervisor.
|
||||
"""
|
||||
def start_link, do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
|
||||
|
||||
# @spec init([]) :: {:ok, state}
|
||||
def init([]) do
|
||||
Process.flag(:trap_exit, true)
|
||||
{:ok, %{q: :queue.new(), blocks: [], running: nil}}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Add a child to this supervisor
|
||||
"""
|
||||
def add_child(%Sequence{} = sequence, _time) do
|
||||
GenServer.call(__MODULE__, {:add, sequence})
|
||||
end
|
||||
|
||||
def add_child(_,_), do: {:error, :not_sequence}
|
||||
|
||||
@doc """
|
||||
Remove a child
|
||||
"""
|
||||
def remove_child(%Sequence{} = sequence) do
|
||||
GenServer.call(__MODULE__, {:stop, sequence})
|
||||
end
|
||||
|
||||
def remove_child(_), do: {:error, :not_sequence}
|
||||
|
||||
@doc """
|
||||
Gets the state
|
||||
"""
|
||||
@spec get_state :: state
|
||||
def get_state do
|
||||
GenServer.call(__MODULE__, :get_state)
|
||||
end
|
||||
|
||||
@spec handle_call({:add, Sequence.t}, reference, state)
|
||||
:: {:reply, {:ok, pid}, state}
|
||||
def handle_call({:add, sequence}, _from, state) do
|
||||
# check if we are running or not
|
||||
case state.running do
|
||||
{_pid, _sequence} ->
|
||||
# queue up a sequence
|
||||
Logger.info ">> is queing up a sequence"
|
||||
q = :queue.in(sequence, state.q)
|
||||
{:reply, {:ok, :queued}, %{state | q: q}}
|
||||
_ ->
|
||||
# start it now
|
||||
Logger.info ">> is starting a sequence"
|
||||
{:ok, pid} = SequenceRunner.start_link(sequence)
|
||||
{:reply, {:ok, pid}, %{state | running: {pid, sequence}}}
|
||||
end
|
||||
end
|
||||
|
||||
@spec handle_call({:stop, Sequence.t}, reference, state)
|
||||
:: {:reply, :ok | {:error, atom}, state}
|
||||
def handle_call({:stop, sequence}, _from, state) do
|
||||
# check if the sequence is running
|
||||
case state.running do
|
||||
{pid, rsequence} ->
|
||||
# if its currently running, stop it
|
||||
if sequence.id == rsequence.id do
|
||||
GenServer.stop(pid, :normal)
|
||||
# This is actually wrong. We need to start the next sequence
|
||||
# in the queue here
|
||||
{:reply, :ok, %{state | running: nil}}
|
||||
else
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
_ ->
|
||||
# if not, pop it from the queue
|
||||
q = :queue.filter(fn(qsequence) ->
|
||||
qsequence.id != sequence.id
|
||||
end, state.q)
|
||||
{:reply, :ok, %{state | q: q}}
|
||||
end
|
||||
end
|
||||
|
||||
@spec handle_call(:get_state, reference, state) :: {:reply, state, state}
|
||||
def handle_call(:get_state, _from, state), do: {:reply, state, state}
|
||||
|
||||
# When a sequence is complete
|
||||
@spec handle_info({:EXIT, pid, any}, state) :: {:noreply, state}
|
||||
def handle_info({:EXIT, pid, reason}, state) do
|
||||
unless reason == :normal do
|
||||
Logger.warn ">> sequence exited unnaturally: #{inspect reason}"
|
||||
end
|
||||
case :queue.out(state.q) do
|
||||
{:empty, q} ->
|
||||
Logger.info ">> no more sequences to run right now."
|
||||
{:noreply, %{state | q: q, running: nil}}
|
||||
{{:value, sequence}, q} ->
|
||||
Logger.info ">> is starting a sequence"
|
||||
{:ok, pid} = SequenceRunner.start_link(sequence)
|
||||
{:noreply, %{state | q: q, running: {pid, sequence}}}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -2,73 +2,140 @@ defmodule Farmbot.SequenceRunner do
|
|||
@moduledoc """
|
||||
Runs a sequence
|
||||
"""
|
||||
|
||||
use Farmbot.Sync.Database
|
||||
require Logger
|
||||
use GenServer
|
||||
alias Farmbot.CeleryScript.Ast
|
||||
alias Farmbot.SequenceRunner.Binding
|
||||
use Counter, __MODULE__
|
||||
import Farmbot.CeleryScript.Command, only: [do_command: 1]
|
||||
require Logger
|
||||
|
||||
defmodule State do
|
||||
defstruct [:binding, :body, :current]
|
||||
def start_link(ast) do
|
||||
GenServer.start_link(__MODULE__, ast, [])
|
||||
end
|
||||
|
||||
@doc """
|
||||
Starts a sequence.
|
||||
"""
|
||||
def start_link(%Ast{} = seq), do: GenServer.start_link(__MODULE__, seq)
|
||||
def wait(sequence), do: GenServer.call(sequence, :wait, :infinity)
|
||||
|
||||
def start_link(sequence) do
|
||||
IO.puts "blerp"
|
||||
sequence = Ast.parse(sequence)
|
||||
start_link(sequence)
|
||||
def init(ast) do
|
||||
|
||||
context = ast.args[:context] || %{parent: nil}
|
||||
|
||||
IO.inspect context
|
||||
|
||||
ast = traverse(ast, context)
|
||||
[first | rest] = ast.body
|
||||
spawn __MODULE__, :work, [first, self()]
|
||||
{:ok, %{blocks: [], body: rest}}
|
||||
end
|
||||
|
||||
def init(%Ast{} = sequence) do
|
||||
Logger.debug "initializing a sequence."
|
||||
{:ok, binding} = Binding.start_link()
|
||||
body = Enum.map(sequence.body, fn(ast) ->
|
||||
ast
|
||||
end)
|
||||
current = do_work(body, self())
|
||||
state = %State{binding: binding, body: body, current: current}
|
||||
{:ok, state}
|
||||
def handle_call(:wait, from, state) do
|
||||
{:noreply, %{state | blocks: [from | state.blocks]}}
|
||||
end
|
||||
|
||||
def handle_call({:next, []}, _from, state) do
|
||||
reset_count()
|
||||
{:stop, :normal, :ok, state}
|
||||
def handle_cast(:finished, %{body: []} = state) do
|
||||
for from <- state.blocks do
|
||||
GenServer.reply(from, :ok)
|
||||
end
|
||||
{:stop, :normal, []}
|
||||
end
|
||||
|
||||
def handle_call({:next, body}, _, state) do
|
||||
current = do_work(body, self())
|
||||
{:reply, :ok, %{state | body: body, current: current}}
|
||||
def handle_cast(:finished, %{body: rest} = state) do
|
||||
[first | rest] = rest
|
||||
spawn __MODULE__, :work, [first, self()]
|
||||
{:noreply, %{state | body: rest}}
|
||||
end
|
||||
|
||||
def handle_call({:stop, reason}, _, state) do
|
||||
{:stop, reason, :ok, state}
|
||||
def work(ast, sequence) do
|
||||
do_command(ast)
|
||||
GenServer.cast(sequence, :finished)
|
||||
end
|
||||
|
||||
defp do_work(body, pid) do
|
||||
spawn(__MODULE__, :work, [body, pid])
|
||||
defp traverse(ast, context)
|
||||
|
||||
defp traverse(%{args: args, body: body} = ast, context) do
|
||||
args = traverse_args(args, context)
|
||||
body = traverse_body(body, context)
|
||||
%Ast{ast | args: args, body: body} |> intercept_ast(context)
|
||||
end
|
||||
|
||||
def work([item | rest], pid) do
|
||||
inc_count()
|
||||
if get_count() > 1_000 do
|
||||
:ok = GenServer.call(pid, {:stop, :recursing})
|
||||
# traverse the k/v version of the args.
|
||||
defp traverse_args(args, context, acc \\ [])
|
||||
|
||||
defp traverse_args(%{} = args, context, []), do: traverse_args(Map.to_list(args), context, [])
|
||||
|
||||
# Turn it back into a map when we are done.
|
||||
defp traverse_args([], _context, acc), do: Map.new(acc)
|
||||
|
||||
defp traverse_args([{key, value} | rest], context, acc) do
|
||||
value = if match?(%Ast{}, value) do
|
||||
traverse(value, context)
|
||||
else
|
||||
Logger.debug "doing #{item.kind}"
|
||||
Farmbot.CeleryScript.Command.do_command(item)
|
||||
:ok = GenServer.call(pid, {:next, rest})
|
||||
value
|
||||
end
|
||||
arg = intercept_arg({key, value}, context)
|
||||
traverse_args(rest, context, [arg | acc])
|
||||
end
|
||||
|
||||
def terminate(reason, state) do
|
||||
if reason != :normal do
|
||||
Logger.error(">> Sequence died! #{inspect reason}")
|
||||
end
|
||||
:ok = Binding.stop(state.binding)
|
||||
# body traversal
|
||||
defp traverse_body(body, context, acc \\ [])
|
||||
|
||||
# When we finish, make sure to reverse the list otherwise its backwards
|
||||
defp traverse_body([], _context, acc), do: acc |> Enum.reverse
|
||||
|
||||
defp traverse_body([item | rest], context, acc) do
|
||||
item = item |> traverse(context) |> intercept_item(context)
|
||||
traverse_body(rest, context, [item | acc])
|
||||
end
|
||||
|
||||
defp intercept_arg({_key, _value} = arg, _context) do
|
||||
# IO.puts "arg: #{inspect arg}"
|
||||
arg
|
||||
end
|
||||
|
||||
defp intercept_item(item, _context) do
|
||||
# IO.puts "item: #{inspect item}"
|
||||
item
|
||||
end
|
||||
|
||||
# inject a new context into execute blocks for the next sequence to use.
|
||||
defp intercept_ast(%{kind: "execute"} = ast, _context) do
|
||||
context = new_context()
|
||||
IO.puts "replacing context: #{inspect context}"
|
||||
%{ast | args: Map.put(ast.args, :context, context)}
|
||||
end
|
||||
|
||||
defp intercept_ast(%{kind: "call_parent"} = ast, context) do
|
||||
IO.puts "replacing context: #{inspect context}"
|
||||
%{ast | args: Map.put(ast.args, :context, context)}
|
||||
end
|
||||
|
||||
defp intercept_ast(ast, _context) do
|
||||
ast
|
||||
end
|
||||
|
||||
defp new_context, do: %{parent: self()}
|
||||
|
||||
def s do
|
||||
# %Farmbot.CeleryScript.Ast{args: %{is_outdated: false, version: 4},
|
||||
# body: [%Farmbot.CeleryScript.Ast{args: %{message: "SHOULD HAPPEN FIRST!",
|
||||
# message_type: "success"}, body: [], comment: nil, kind: "send_message"},
|
||||
# %Farmbot.CeleryScript.Ast{args: %{_else: %Farmbot.CeleryScript.Ast{args: %{},
|
||||
# body: [], comment: nil, kind: "nothing"},
|
||||
# _then: %Farmbot.CeleryScript.Ast{args: %{sequence_id: 297}, body: [],
|
||||
# comment: nil, kind: "execute"}, lhs: "x", op: "is", rhs: 0}, body: [],
|
||||
# comment: nil, kind: "_if"},
|
||||
# %Farmbot.CeleryScript.Ast{args: %{message: "SHOULD HAPPEN LAST!",
|
||||
# message_type: "success"}, body: [], comment: nil, kind: "send_message"}],
|
||||
# comment: nil, kind: "sequence"}
|
||||
%Farmbot.CeleryScript.Ast{args: %{is_outdated: false, version: 4},
|
||||
body: [
|
||||
%Farmbot.CeleryScript.Ast{args: %{message: "SHOULD HAPPEN FIRST!", message_type: "success"}, body: [], comment: nil, kind: "send_message"},
|
||||
%Farmbot.CeleryScript.Ast{args: %{_else: %Farmbot.CeleryScript.Ast{args: %{}, body: [], comment: nil, kind: "nothing"},
|
||||
_then: %Farmbot.CeleryScript.Ast{args: %{sequence_id: 297}, body: [], comment: nil, kind: "execute"},
|
||||
lhs: "x", op: "is", rhs: 0}, body: [],
|
||||
comment: nil,
|
||||
kind: "_if"},
|
||||
%Farmbot.CeleryScript.Ast{args: %{message: "SHOULD HAPPEN LAST!", message_type: "success"}, body: [], comment: nil, kind: "send_message"},
|
||||
%Farmbot.CeleryScript.Ast{args: %{}, body: [], comment: nil, kind: "call_parent"}
|
||||
],
|
||||
comment: nil, kind: "sequence"}
|
||||
end
|
||||
|
||||
end
|
||||
|
|
1
nerves/.gitignore
vendored
1
nerves/.gitignore
vendored
|
@ -1,3 +1,4 @@
|
|||
*
|
||||
!.gitignore
|
||||
!nerves_system_*
|
||||
nerves_system_br
|
||||
|
|
|
@ -15,12 +15,12 @@ use Mix.Releases.Config,
|
|||
|
||||
environment :dev do
|
||||
set cookie: :"gz`tgx[zM,ueL[g{Ji62{jiawNDZHH~PGkNQLa&R>R7c0SKziff4L,*&ZNG)(qu0"
|
||||
set pre_start_hook: "rel/hooks/pre_start"
|
||||
# set pre_start_hook: "rel/hooks/pre_start"
|
||||
end
|
||||
|
||||
environment :prod do
|
||||
set cookie: :"gz`tgx[zM,ueL[g{Ji62{jiawNDZHH~PGkNQLa&R>R7c0SKziff4L,*&ZNG)(qu0"
|
||||
set pre_start_hook: "rel/hooks/pre_start"
|
||||
# set pre_start_hook: "rel/hooks/pre_start"
|
||||
end
|
||||
|
||||
# You may define one or more releases in this file.
|
||||
|
|
Loading…
Reference in a new issue