Refactor farmware pipes to use domain sockets

This fixes issues with Erlang blocking IO operations
when using port to communicate with a named pipe
pull/974/head
Connor Rigby 2019-08-20 13:37:51 -07:00
parent fcadeebe6e
commit ca3356a09a
No known key found for this signature in database
GPG Key ID: 29A88B24B70456E0
3 changed files with 66 additions and 31 deletions

View File

@ -97,9 +97,9 @@ defmodule FarmbotCore.FarmwareRuntime do
# Create pipe dir if it doesn't exist
_ = File.mkdir_p(@runtime_dir)
# Open a pipe
{:ok, req} = PipeWorker.start_link(request_pipe)
{:ok, resp} = PipeWorker.start_link(response_pipe)
# Open pipes
{:ok, req} = PipeWorker.start_link(request_pipe, :in)
{:ok, resp} = PipeWorker.start_link(response_pipe, :out)
exec = System.find_executable(manifest.executable)
installation_path = install_dir(manifest)
@ -112,6 +112,7 @@ defmodule FarmbotCore.FarmwareRuntime do
)
# Start the plugin.
Logger.debug "spawning farmware: #{exec} #{manifest.args}"
{cmd, _} = spawn_monitor(MuonTrap, :cmd, ["sh", ["-c", "#{exec} #{manifest.args}"], opts])
state = %State{

View File

@ -6,18 +6,20 @@ defmodule FarmbotCore.FarmwareRuntime.PipeWorker do
require Logger
alias __MODULE__, as: State
defstruct [
:port,
:pipe_name,
:pipe,
:buffer,
:caller,
:size,
:timeout_timer
:timeout_timer,
:direction
]
@read_time 5
def start_link(pipe_name) do
GenServer.start_link(__MODULE__, [pipe_name])
def start_link(pipe_name, direction) do
GenServer.start_link(__MODULE__, [pipe_name, direction])
end
def close(pipe) do
@ -32,37 +34,63 @@ defmodule FarmbotCore.FarmwareRuntime.PipeWorker do
GenServer.call(pipe, {:write, packet}, :infinity)
end
def init([pipe_name]) do
with {_, 0} <- System.cmd("mkfifo", [pipe_name]),
pipe <- :erlang.open_port(to_charlist(pipe_name), [:eof, :binary]) do
{:ok, %State{pipe_name: pipe_name, pipe: pipe, buffer: <<>>}}
else
{:error, _} = error -> {:stop, error}
{_, _num} -> {:stop, {:error, "mkfifo"}}
end
def init([pipe_name, direction]) do
Logger.debug "opening pipe: #{pipe_name}"
{:ok, port} = :gen_tcp.listen(0, [
{:ip, {:local, to_charlist(pipe_name)}},
{:ifaddr, {:local, to_charlist(pipe_name)}},
:local,
{:active, true},
:binary
])
send self(), :accept
# {:ok, pipe} = :gen_tcp.accept(lsocket)
{:ok, %State{pipe_name: pipe_name, port: port, buffer: <<>>, direction: direction}}
end
def terminate(_, state) do
Logger.warn("PipeWorker #{state.pipe_name} exit")
:erlang.port_close(state.pipe)
Logger.warn("PipeWorker #{state.direction} #{state.pipe_name} exit")
# :erlang.port_close(state.pipe)
state.pipe && :gen_tcp.close(state.pipe)
File.rm!(state.pipe_name)
end
def handle_call({:write, packet}, _from, state) do
Logger.debug "writing #{byte_size(packet)} bytes"
reply = :erlang.port_command(state.pipe, packet)
{:reply, reply, state}
Logger.debug "#{state.direction} writing #{byte_size(packet)} bytes"
if state.pipe do
:ok = :gen_tcp.send(state.pipe, packet)
else
Logger.warn "no pipe"
end
# reply = :erlang.port_command(state.pipe, packet)
{:reply, true, state}
end
def handle_call({:read, amnt}, {_pid, ref} = from, %{caller: nil, size: nil} = state) do
Logger.debug "requesting: #{amnt} bytes"
Logger.debug "#{state.direction} requesting: #{amnt} bytes"
Process.send_after(self(), {:read, amnt, from}, @read_time)
timeout_timer = Process.send_after(self(), {:timeout, amnt, from}, 5000)
# timeout_timer = Process.send_after(self(), {:timeout, amnt, from}, 5000)
timeout_timer = nil
{:reply, ref, %{state | caller: from, size: amnt, timeout_timer: timeout_timer}}
end
def handle_info({:tcp_closed, _port}, state) do
send self(), :accept
{:noreply, %{state | pipe: nil}}
end
def handle_info(:accept, state) do
case :gen_tcp.accept(state.port, 100) do
{:ok, pipe} ->
{:noreply, %{state | pipe: pipe}}
{:error, :timeout} ->
send self(), :accept
{:noreply, %{state | pipe: nil}}
end
end
def handle_info({:timeout, size, {pid, ref}}, %{caller: {pid, ref}} = state) do
Logger.error "Timed out waiting on #{size} bytes."
Logger.error "#{state.direction} Timed out waiting on #{size} bytes."
{:stop, :timeout, state}
end
@ -71,8 +99,10 @@ defmodule FarmbotCore.FarmwareRuntime.PipeWorker do
{:noreply, state}
end
def handle_info({pipe, {:data, data}}, %{pipe: pipe, buffer: buffer} = state) do
Logger.debug "buffering #{byte_size(data)} bytes"
# {udp,#Port<0.676>,{127,0,0,1},8790,<<"hey there!">>}
# def handle_info({pipe, {:data, data}}, %{pipe: pipe, buffer: buffer} = state) do
def handle_info({:tcp, pipe, data}, %{pipe: pipe, buffer: buffer} = state) do
Logger.debug "#{state.direction} buffering #{byte_size(data)} bytes"
{:noreply, %{state | buffer: buffer <> data}}
end
@ -81,17 +111,15 @@ defmodule FarmbotCore.FarmwareRuntime.PipeWorker do
{pid, ref} = caller
{resp, buffer} = String.split_at(buffer, size)
send(pid, {__MODULE__, ref, {:ok, resp}})
Logger.debug "pipe worker read #{size} bytes successfully #{byte_size(buffer)} bytes remaining in buffer."
Logger.debug "#{state.direction} pipe worker read #{size} bytes successfully #{byte_size(buffer)} bytes remaining in buffer."
{:noreply, %{state | buffer: buffer, size: nil, caller: nil}}
end
def handle_info({:read, size, caller}, %{buffer: buffer} = state) when byte_size(buffer) < size do
_ = state.timeout_timer && Process.cancel_timer(state.timeout_timer)
if byte_size(buffer) != 0 do
Logger.debug "pipe worker still waiting on #{size - byte_size(buffer)} bytes for #{inspect(caller)} Currently #{byte_size(buffer)} bytes in buffer"
Logger.debug "#{state.direction} pipe worker still waiting on #{size - byte_size(buffer)} bytes for #{inspect(caller)} Currently #{byte_size(buffer)} bytes in buffer"
end
Process.send_after(self(), {:read, size, caller}, @read_time)
timeout_timer = Process.send_after(self(), {:timeout, size, caller}, 5000)
{:noreply, %{state | timeout_timer: timeout_timer}}
{:noreply, state}
end
end

View File

@ -46,9 +46,12 @@ defmodule FarmbotOS.SysCalls.Farmware do
defp loop(farmware_name, runtime, monitor, {ref, label}) do
receive do
{:DOWN, ^monitor, :process, ^runtime, :normal} ->
{:DOWN, ^monitor, :process, _runtime, :normal} ->
:ok
{:DOWN, ^monitor, :process, _runtime, error} ->
{:error, inspect(error)}
{:step_complete, ^ref, :ok} ->
Logger.debug("ok for #{label}")
response = %AST{kind: :rpc_ok, args: %{label: label}, body: []}
@ -70,7 +73,7 @@ defmodule FarmbotOS.SysCalls.Farmware do
Logger.info("Already processing a celeryscript request: #{label}")
loop(farmware_name, runtime, monitor, {ref, label})
else
case FarmwareRuntime.process_rpc(runtime) do
case Process.alive?(runtime) && FarmwareRuntime.process_rpc(runtime) do
{:ok, %{args: %{label: label}} = rpc} ->
ref = make_ref()
Logger.debug("executing rpc: #{inspect(rpc)}")
@ -79,6 +82,9 @@ defmodule FarmbotOS.SysCalls.Farmware do
{:error, :no_rpc} ->
loop(farmware_name, runtime, monitor, {ref, label})
false ->
:ok
end
end
end