farmbot_os/farmbot_core/lib/farmware_runtime.ex

320 lines
9.8 KiB
Elixir

defmodule Farmbot.FarmwareRuntime do
@moduledoc """
Handles execution of Farmware plugins.
"""
alias Farmbot.FarmwareRuntime.PipeWorker
alias Farmbot.CeleryScript.AST
alias Farmbot.AssetWorker.Farmbot.Asset.FarmwareInstallation
alias Farmbot.Asset.FarmwareInstallation.Manifest
import FarmwareInstallation, only: [install_dir: 1]
alias Farmbot.{Asset, JSON}
import Farmbot.Config, only: [get_config_value: 3]
require Logger
@error_timeout_ms 5000
@runtime_dir Application.get_env(:farmbot_core, __MODULE__)[:runtime_dir]
@runtime_dir ||
Mix.raise("""
config :farmbot_core, Farmbot.FarmwareRuntime,
runtime_dir: "/tmp/farmware_runtime"
""")
@muontrap_opts Application.get_env(:farmbot_core, __MODULE__)[:muontrap_opts]
@muontrap_opts || []
@packet_header_token 0xFBFB
@packet_header_byte_size 10
alias __MODULE__, as: State
defstruct [
:cmd,
:mon,
:context,
:rpc,
:request_pipe,
:request_pipe_handle,
:response_pipe,
:response_pipe_handle
]
@opaque pipe_handle :: pid()
@type t :: %State{
request_pipe: Path.t(),
request_pipe_handle: pipe_handle,
response_pipe: Path.t(),
response_pipe_handle: pipe_handle,
cmd: pid(),
mon: pid() | nil,
rpc: map(),
context: :get_header | :get_payload | :process_payload | :send_response
}
def stub(farmware_name) do
manifest = Asset.get_farmware_manifest() || raise("not found")
{:ok, pid} = Farmbot.FarmwareRuntime.start_link(manifest)
Process.flag(:trap_exit, true)
stub_loop(pid)
end
def stub_loop(pid) do
receive do
{:EXIT, ^pid, reason} -> reason
after
100 ->
case Farmbot.FarmwareRuntime.process_rpc(pid) do
{:ok, %{args: %{label: label}} = rpc} ->
IO.puts("Stup processing #{inspect(rpc)}")
response = %AST{kind: :rpc_ok, args: %{label: label}, body: []}
true = Farmbot.FarmwareRuntime.rpc_processed(pid, response)
stub_loop(pid)
{:error, :no_rpc} ->
stub_loop(pid)
end
end
end
@doc """
Calls the Farmware Runtime asking for any RPCs that need to be
processed. If an RPC was ready, the Farmware will not process
any more RPCs until the current one is done.
"""
def process_rpc(pid) do
GenServer.call(pid, :process_rpc)
end
@doc """
Calls the Farmware Runtime telling it that an RPC has been processed.
"""
def rpc_processed(pid, response) do
GenServer.call(pid, {:rpc_processed, response})
end
@doc "Start a Farmware"
def start_link(%Manifest{} = manifest) do
package = manifest.package
GenServer.start_link(__MODULE__, [manifest], name: String.to_atom(package))
end
def init([manifest]) do
package = manifest.package
request_pipe =
Path.join([
@runtime_dir,
package <> "-" <> Ecto.UUID.generate() <> "-farmware-request-pipe"
])
response_pipe =
Path.join([
@runtime_dir,
package <> "-" <> Ecto.UUID.generate() <> "-farmware-response-pipe"
])
env = build_env(manifest, request_pipe, response_pipe)
# Create pipe dir if it doesn't exist
_ = File.mkdir_p(@runtime_dir)
# Open a pipe
{:ok, req} = PipeWorker.start_link(request_pipe)
{:ok, resp} = PipeWorker.start_link(response_pipe)
exec = System.find_executable(manifest.executable)
installation_path = install_dir(manifest)
opts =
Keyword.merge(@muontrap_opts,
env: env,
cd: installation_path,
into: IO.stream(:stdio, :line)
)
# Start the plugin.
{cmd, _} = spawn_monitor(MuonTrap, :cmd, [exec, manifest.args, opts])
state = %State{
cmd: cmd,
mon: nil,
context: :get_header,
rpc: nil,
request_pipe: request_pipe,
request_pipe_handle: req,
response_pipe: response_pipe,
response_pipe_handle: resp
}
{:ok, state, 0}
end
def terminate(_reason, state) do
if state.cmd && Process.alive?(state.cmd), do: Process.exit(state.cmd, :kill)
if state.request_pipe_handle do
PipeWorker.close(state.request_pipe_handle)
end
if state.response_pipe_handle do
PipeWorker.close(state.response_pipe_handle)
end
end
# If we are in the `process_request` state, send the RPC out to be buffered.
# This moves us to the `send_response` state. (which has _no_ timeout)
def handle_call(:process_rpc, {pid, _} = _from, %{context: :process_request, rpc: rpc} = state) do
# Link the calling process
# so the Farmware can exit if the rpc never gets processed.
_ = Process.link(pid)
{:reply, {:ok, rpc}, %{state | rpc: nil, context: :send_response}}
end
# If not in the `process_request` state, noop
def handle_call(:process_rpc, _from, state) do
{:reply, {:error, :no_rpc}, state}
end
def handle_call({:rpc_processed, result}, {pid, _} = _from, %{context: :send_response} = state) do
# Unlink the calling process
_ = Process.unlink(pid)
ipc = add_header(result)
reply = PipeWorker.write(state.response_pipe_handle, ipc)
# Make sure to `timeout` after this one to go back to the
# get_header context. This will cause another rpc to be processed.
{:reply, reply, %{state | rpc: nil, context: :get_header}, 0}
end
# get_request does two reads. One to get the header,
# and a second to get the entire binary payload.
def handle_info(:timeout, %{context: :get_header} = state) do
state = async_request_pipe_read(state, @packet_header_byte_size)
{:noreply, state}
end
# Timeout set by `handle_packet/2`. This will mean the CSVM
# didn't pick up the scheduled AST in a reasonable amount of time.
def handle_info(:timeout, %{context: :process_request} = state) do
Logger.error("Timeout waiting for #{inspect(state.rpc)} to be processed")
{:stop, {:error, :rpc_timeout}, state}
end
# farmware exit
def handle_info({:DOWN, _ref, :process, cmd, _reason}, %{cmd: cmd} = state) do
Logger.debug("Farmware exit")
{:stop, :normal, state}
end
# successful result of an io:read/2 in :get_header context
def handle_info(
{PipeWorker, _ref,
{:ok,
<<@packet_header_token::size(16), _reserved::size(32),
payload_size::integer-big-size(32)>>}},
%{context: :get_header} = state
) do
state = async_request_pipe_read(state, payload_size)
{:noreply, %{state | context: :get_payload}}
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, {:ok, data}}, %{context: :get_header} = state) do
Logger.error("Bad header: #{inspect(data, base: :hex, limit: :infinity)}")
{:stop, {:unhandled_packet, data}, state}
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, error}, %{context: :get_header} = state) do
Logger.error("Bad header: #{inspect(error)}")
{:stop, error, state}
end
# successful result of an io:read/2 in :get_payload context
def handle_info({PipeWorker, _ref, {:ok, packet}}, %{context: :get_payload} = state) do
handle_packet(packet, state)
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, error}, %{context: :get_payload} = state) do
Logger.error("Bad payload: #{inspect(error)}")
{:stop, error, state}
end
# Pipe reads are done async because reading will block the entire
# process from receiving more messages as well as
# prevent the processes from terminating.
# this means if a Farmware never opens the pipe
# (a valid use case), When the Farmware completes
# the pipe will still be waiting for information
# and prevent the pipes from closing.
defp async_request_pipe_read(state, size) do
mon = PipeWorker.read(state.request_pipe_handle, size)
%{state | mon: mon}
end
# When a packet arives, buffer it until
# the controlling process (the CSVM) picks it up.
# there is a timeout for how long a packet will wait to be collected,
# but no time limit to how long it will take to
# process the packet.
def handle_packet(packet, state) do
with {:ok, data} <- JSON.decode(packet),
{:ok, rpc} <- decode_ast(data) do
IO.inspect(rpc, label: "processing RPC")
{:noreply, %{state | rpc: rpc, context: :process_request}, @error_timeout_ms}
else
error -> {:stop, error, state}
end
end
defp decode_ast(data) do
try do
case AST.decode(data) do
%{kind: :rpc_request} = ast ->
{:ok, ast}
%{} = ast ->
Logger.error("Got bad ast: #{inspect(ast)}")
{:error, :bad_ast}
end
rescue
_ -> {:error, :bad_ast}
end
end
defp build_env(manifest, request_pipe, response_pipe) do
token = get_config_value(:string, "authorization", "token")
images_dir = "/tmp/images"
installation_path = install_dir(manifest)
state_root_dir = Application.get_env(:farmbot_core, Farmbot.BotState.FileSystem)[:root_dir]
base =
Map.new()
|> Map.put("FARMWARE_API_V2_REQUEST_PIPE", request_pipe)
|> Map.put("FARMWARE_API_V2_RESPONSE_PIPE", response_pipe)
|> Map.put("FARMBOT_API_TOKEN", token)
|> Map.put("FARMBOT_OS_IMAGES_DIR", images_dir)
|> Map.put("FARMBOT_OS_VERSION", Farmbot.Project.version())
|> Map.put("FARMBOT_OS_STATE_DIR", state_root_dir)
|> Map.put("PYTHONPATH", installation_path)
Asset.list_farmware_env()
|> Map.new(fn %{key: key, value: val} -> {key, val} end)
|> Map.merge(base)
end
defp add_header(%AST{} = rpc) do
payload = rpc |> Map.from_struct() |> JSON.encode!()
header =
<<@packet_header_token::size(16)>> <>
:binary.copy(<<0x00>>, 4) <> <<byte_size(payload)::big-size(32)>>
IO.puts("header size: #{byte_size(header)}")
IO.inspect(header, label: "Header", base: :hex)
header <> payload
end
end