Refactor Farmware to need less manual pid cleanup

pull/1101/head
connor rigby 2019-12-13 08:53:00 -08:00 committed by Connor Rigby
parent 3cf1f686fd
commit 60e1594d26
2 changed files with 66 additions and 166 deletions

View File

@ -32,10 +32,12 @@ defmodule FarmbotCore.FarmwareRuntime do
alias __MODULE__, as: State
defstruct [
:caller,
:cmd,
:mon,
:context,
:rpc,
:scheduler_ref,
:request_pipe,
:request_pipe_handle,
:response_pipe,
@ -45,36 +47,22 @@ defmodule FarmbotCore.FarmwareRuntime do
@opaque pipe_handle :: pid()
@type t :: %State{
caller: pid(),
request_pipe: Path.t(),
request_pipe_handle: pipe_handle,
response_pipe: Path.t(),
response_pipe_handle: pipe_handle,
scheduler_ref: reference() | nil,
cmd: pid(),
mon: pid() | nil,
rpc: map(),
context: :get_header | :get_payload | :process_payload | :send_response
context: :get_header | :get_payload | :process_payload | :send_response | :error
}
@doc """
Calls the Farmware Runtime asking for any RPCs that need to be
processed. If an RPC was ready, the Farmware will not process
any more RPCs until the current one is done.
"""
def process_rpc(pid) do
GenServer.call(pid, :process_rpc)
end
@doc """
Calls the Farmware Runtime telling it that an RPC has been processed.
"""
def rpc_processed(pid, response) do
GenServer.call(pid, {:rpc_processed, response})
end
@doc "Start a Farmware"
def start_link(%Manifest{} = manifest, env \\ %{}) do
package = manifest.package
GenServer.start_link(__MODULE__, [manifest, env], name: String.to_atom(package))
GenServer.start_link(__MODULE__, [manifest, env, self()], name: String.to_atom(package))
end
@doc "Stop a farmware"
@ -85,7 +73,7 @@ defmodule FarmbotCore.FarmwareRuntime do
end
end
def init([manifest, env]) do
def init([manifest, env, caller]) do
package = manifest.package
<<clause1 :: binary-size(8), _::binary>> = Ecto.UUID.generate()
@ -125,10 +113,12 @@ defmodule FarmbotCore.FarmwareRuntime do
{cmd, _} = spawn_monitor(MuonTrap, :cmd, ["sh", ["-c", "#{exec} #{manifest.args}"], opts])
state = %State{
caller: caller,
cmd: cmd,
mon: nil,
context: :get_header,
rpc: nil,
scheduler_ref: nil,
request_pipe: request_pipe,
request_pipe_handle: req,
response_pipe: response_pipe,
@ -151,29 +141,26 @@ defmodule FarmbotCore.FarmwareRuntime do
end
end
# If we are in the `process_request` state, send the RPC out to be buffered.
# This moves us to the `send_response` state. (which has _no_ timeout)
def handle_call(:process_rpc, {pid, _} = _from, %{context: :process_request, rpc: rpc} = state) do
# Link the calling process
# so the Farmware can exit if the rpc never gets processed.
_ = Process.link(pid)
{:reply, {:ok, rpc}, %{state | rpc: nil, context: :send_response}}
def handle_info(msg, %{context: :error} = state) do
Logger.warn "unhandled message in error state: #{inspect(msg)}"
{:noreply, state}
end
# If not in the `process_request` state, noop
def handle_call(:process_rpc, _from, state) do
{:reply, {:error, :no_rpc}, state}
def handle_info({:step_complete, ref, {:error, reason}}, %{scheduler_ref: ref} = state) do
send state.caller, {:error, reason}
{:noreply, %{state | ref: nil, context: :error}}
end
def handle_call({:rpc_processed, result}, {pid, _} = _from, %{context: :send_response} = state) do
# Unlink the calling process
_ = Process.unlink(pid)
def handle_info({:step_complete, ref, :ok}, %{scheduler_ref: ref} = state) do
label = UUID.uuid4()
result = %AST{kind: :rpc_ok, args: %{label: label}, body: []}
ipc = add_header(result)
reply = PipeWorker.write(state.response_pipe_handle, ipc)
_reply = PipeWorker.write(state.response_pipe_handle, ipc)
# Make sure to `timeout` after this one to go back to the
# get_header context. This will cause another rpc to be processed.
send self(), :timeout
{:reply, reply, %{state | rpc: nil, context: :get_header}}
{:noreply, %{state | rpc: nil, context: :get_header}}
end
# get_request does two reads. One to get the header,
@ -187,13 +174,15 @@ defmodule FarmbotCore.FarmwareRuntime do
# didn't pick up the scheduled AST in a reasonable amount of time.
def handle_info(:timeout, %{context: :process_request} = state) do
Logger.error("Timeout waiting for #{inspect(state.rpc)} to be processed")
{:stop, {:error, :rpc_timeout}, state}
send state.caller, {:error, :rpc_timeout}
{:noreply, %{state | context: :error}}
end
# farmware exit
def handle_info({:DOWN, _ref, :process, _pid, _reason}, %{cmd: _cmd_pid} = state) do
Logger.debug("Farmware exit")
{:stop, :normal, %{state | cmd: nil}}
send state.caller, {:error, :farmware_exit}
{:noreply, %{state | context: :error}}
end
# successful result of an io:read/2 in :get_header context
@ -211,13 +200,15 @@ defmodule FarmbotCore.FarmwareRuntime do
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, {:ok, data}}, %{context: :get_header} = state) do
Logger.error("Bad header: #{inspect(data, base: :hex, limit: :infinity)}")
{:stop, {:unhandled_packet, data}, state}
send state.caller, {:error, {:unhandled_packet, data}}
{:noreply, %{state | context: :error}}
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, error}, %{context: :get_header} = state) do
Logger.error("Bad header: #{inspect(error)}")
{:stop, error, state}
send state.caller, {:error, :bad_packet_header}
{:noreply, %{state | context: :error}}
end
# successful result of an io:read/2 in :get_payload context
@ -228,7 +219,8 @@ defmodule FarmbotCore.FarmwareRuntime do
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, error}, %{context: :get_payload} = state) do
Logger.error("Bad payload: #{inspect(error)}")
{:stop, error, state}
send state.caller, {:error, :bad_packet_payload}
{:noreply, %{state | context: :error}}
end
# Pipe reads are done async because reading will block the entire
@ -251,9 +243,15 @@ defmodule FarmbotCore.FarmwareRuntime do
def handle_packet(packet, state) do
with {:ok, data} <- JSON.decode(packet),
{:ok, rpc} <- decode_ast(data) do
{:noreply, %{state | rpc: rpc, context: :process_request}, @error_timeout_ms}
ref = make_ref()
Logger.debug("executing rpc from farmware: #{inspect(rpc)}")
# todo(connor) replace this with StepRunner?
FarmbotCeleryScript.execute(rpc, ref)
{:noreply, %{state | rpc: rpc, scheduler_ref: ref, context: :process_request}, @error_timeout_ms}
else
error -> {:stop, error, state}
{:error, reason} ->
send state.caller, {:error, reason}
{:noreply, %{state | context: :error}}
end
end

View File

@ -1,6 +1,6 @@
defmodule FarmbotOS.SysCalls.Farmware do
require Logger
alias FarmbotCeleryScript.AST
# alias FarmbotCeleryScript.AST
alias FarmbotCore.{Asset, AssetSupervisor, FarmwareRuntime}
alias FarmbotExt.API.ImageUploader
@ -29,131 +29,33 @@ defmodule FarmbotOS.SysCalls.Farmware do
# Entry point to starting a farmware
def execute_script(farmware_name, env) do
# NOTE Connor:
# this is a really big hammer to fix a bug i don't fully understand.
# for some reason even tho we call `if Process.alive?....` before every call,
# there is still a possibility of the genserver being down for some reason.
# catching the genserver call failure "fixes" it.
try do
with {:ok, manifest} <- lookup_manifest(farmware_name),
{:ok, runtime} <- FarmwareRuntime.start_link(manifest, env),
monitor <- Process.monitor(runtime),
:ok <- loop(farmware_name, runtime, monitor, {nil, nil}),
_ <- Process.demonitor(monitor),
_ <- empty_mailbox(),
:ok <- ImageUploader.force_checkup() do
:ok
else
{:error, {:already_started, pid}} ->
Logger.warn("Farmware #{farmware_name} is already running")
_ = FarmwareRuntime.stop(pid)
execute_script(farmware_name, env)
{:error, reason} when is_binary(reason) ->
_ = ImageUploader.force_checkup()
{:error, reason}
end
catch
error, reason ->
Logger.debug("farmware catchall #{inspect(error)}: #{inspect(reason)}")
:ok
end
end
defp empty_mailbox() do
receive do
msg ->
Logger.debug("dropping message after farmware: #{inspect(msg)}")
empty_mailbox()
after
100 -> :ok
end
end
defp loop(farmware_name, runtime, monitor, {ref, label}) do
receive do
{:EXIT, ^runtime, :normal} ->
Logger.debug("Farmware monitor down: :normal state: #{inspect(label)}")
:ok
{:DOWN, ^monitor, :process, ^runtime, :normal} ->
Logger.debug("Farmware monitor down: :normal state: #{inspect(label)}")
:ok
{:EXIT, ^runtime, error} ->
Logger.debug("Farmware monitor down: #{inspect(error)} state: #{inspect(label)}")
{:error, inspect(error)}
{:DOWN, ^monitor, :process, ^runtime, error} ->
Logger.debug("Farmware monitor down: #{inspect(error)} state: #{inspect(label)}")
{:error, inspect(error)}
{:step_complete, ^ref, :ok} ->
Logger.debug("ok for #{label}")
response = %AST{kind: :rpc_ok, args: %{label: label}, body: []}
if Process.alive?(runtime) do
true = FarmwareRuntime.rpc_processed(runtime, response)
loop(farmware_name, runtime, monitor, {nil, nil})
else
:ok
end
{:step_complete, ^ref, {:error, reason}} ->
Logger.debug("error for #{label}")
explanation = %AST{kind: :explanation, args: %{message: reason}}
response = %AST{kind: :rpc_error, args: %{label: label}, body: [explanation]}
if Process.alive?(runtime) do
true = FarmwareRuntime.rpc_processed(runtime, response)
loop(farmware_name, runtime, monitor, {nil, nil})
else
:ok
end
msg ->
if Process.alive?(runtime) do
_ = FarmwareRuntime.stop(runtime)
end
{:error, "unhandled message: #{inspect(msg)} in state: #{inspect({ref, label})}"}
after
500 ->
cond do
# already have a request processing
is_reference(ref) ->
Logger.info("Already processing a celeryscript request: #{label}")
loop(farmware_name, runtime, monitor, {ref, label})
# check to see if it's alive just in case?
Process.alive?(runtime) ->
process(farmware_name, runtime, monitor, {ref, label})
# No other conditions: Process stopped, but missed the message?
true ->
if Process.alive?(runtime) do
_ = FarmwareRuntime.stop(runtime)
end
:ok
end
end
end
defp process(farmware_name, runtime, monitor, {ref, label}) do
if Process.alive?(runtime) do
case FarmwareRuntime.process_rpc(runtime) do
{:ok, %{args: %{label: label}} = rpc} ->
ref = make_ref()
Logger.debug("executing rpc: #{inspect(rpc)}")
FarmbotCeleryScript.execute(rpc, ref)
loop(farmware_name, runtime, monitor, {ref, label})
{:error, :no_rpc} ->
loop(farmware_name, runtime, monitor, {ref, label})
end
else
with {:ok, manifest} <- lookup_manifest(farmware_name),
{:ok, runtime} <- FarmwareRuntime.start_link(manifest, env),
:ok <- loop(runtime),
:ok <- ImageUploader.force_checkup() do
:ok
else
{:error, {:already_started, pid}} ->
FarmwareRuntime.stop(pid)
execute_script(farmware_name, env)
{:error, reason} ->
{:error, reason}
end
end
def loop(farmware_runtime) do
receive do
{:error, :farmware_exit} ->
:ok
{:error, reason} ->
{:error, inspect(reason)}
after
30_000 ->
Logger.debug("Force stopping farmware: #{inspect(farmware_runtime)}")
FarmwareRuntime.stop(farmware_runtime)
:ok
end
end
end