stream to fwup instead of saving a file, and then writing it

pull/363/head
Connor Rigby 2017-11-12 16:52:04 -08:00
parent 8caa47f9d8
commit 549d78e6e1
7 changed files with 226 additions and 21 deletions

View File

@ -120,6 +120,42 @@ defmodule Farmbot.BotState do
farmwares: %{}
}
def download_progress_fun(name) do
alias Farmbot.BotState.JobProgress
fn(bytes, total) ->
{do_send, prog} = cond do
# if the total is complete spit out the bytes, and put a status of complete.
total == :complete ->
Logger.success 3, "#{name} complete."
{true, %JobProgress.Bytes{bytes: bytes, status: :complete}}
# if we don't know the total just spit out the bytes.
total == nil ->
# debug_log "#{name} - #{bytes} bytes."
{rem(bytes, 10) == 0, %JobProgress.Bytes{bytes: bytes}}
# if the number of bytes == the total bytes, percentage side is complete.
(div(bytes,total)) == 1 ->
Logger.success 3, "#{name} complete."
{true, %JobProgress.Percent{percent: 100, status: :complete}}
# anything else is a percent.
true ->
percent = ((bytes / total) * 100) |> round()
# Logger.busy 3, "#{name} - #{bytes}/#{total} = #{percent}%"
{rem(percent, 10) == true, %JobProgress.Percent{percent: percent}}
end
if do_send do
Farmbot.BotState.set_job_progress(name, prog)
else
:ok
end
end
end
@doc "Set job progress."
def set_job_progress(name, progress) do
GenServer.call(__MODULE__, {:set_job_progress, name, progress})
end
@doc "Get a current pin value."
def get_pin_value(num) do
GenStage.call(__MODULE__, {:get_pin_value, num})
@ -234,6 +270,12 @@ defmodule Farmbot.BotState do
{:reply, state.location_data.position, [], state}
end
def handle_call({:set_job_progress, name, progress}, _from, state) do
jobs = Map.put(state.jobs, name, progress)
new_state = %{state | jobs: jobs}
{:reply, :ok, [new_state], new_state}
end
def handle_call({:register_farmware, fw}, _, state) do
ser_fw_meta = %{
min_os_version_major: fw.min_os_version_major,

View File

@ -0,0 +1,43 @@
defmodule Farmbot.BotState.JobProgress do
@moduledoc "Interface for job progress."
@typedoc "Unit of the job."
@type unit :: :percent | :bytes
@typedoc "Status of the job."
@type status :: :working | :complete | :error
defmodule Percent do
@moduledoc "Percent job."
defstruct [status: :working, percent: 0, unit: :percent]
defimpl Inspect, for: __MODULE__ do
def inspect(%{percent: percent}, _) do
"#Percent<#{percent}>"
end
end
@type t :: %__MODULE__{
status: Farmbot.BotState.JobProgress.status,
percent: integer,
unit: :percent
}
end
defmodule Bytes do
@moduledoc "Byes job."
defstruct [status: :working, bytes: 0, unit: :bytes]
defimpl Inspect, for: __MODULE__ do
def inspect(%{bytes: bytes}, _) do
"#bytes<#{bytes}>"
end
end
@type t :: %__MODULE__{
status: Farmbot.BotState.JobProgress.status,
bytes: integer,
unit: :bytes
}
end
@type t :: Bytes.t | Percent.t
end

View File

@ -26,6 +26,7 @@ defmodule Farmbot.HTTP.Adapter do
Arg 2 should be the total number of bytes, nil, or the atom :complete
"""
@type progress_callback :: (number, number | nil | :complete -> any)
@type stream_fun :: (number, binary -> any)
@typedoc "A json serializable map of meta data about an upload."
@type upload_meta :: map
@ -35,7 +36,7 @@ defmodule Farmbot.HTTP.Adapter do
{:ok, Response.t()} | {:error, term}
@doc "Download a file to the Filesystem."
@callback download_file(adapter, url, Path.t(), progress_callback, body, headers) ::
@callback download_file(adapter, url, Path.t(), progress_callback, body, headers, stream_fun) ::
{:ok, Path.t()} | {:error, term}
@doc "Upload a file."

View File

@ -81,10 +81,10 @@ defmodule Farmbot.HTTP do
end
@doc "Download a file to the filesystem."
def download_file(url, path, progress_callback \\ nil, payload \\ "", headers \\ [])
def download_file(url, path, progress_callback \\ nil, payload \\ "", headers \\ [], stream_fun \\ nil)
def download_file(url, path, progress_callback, payload, headers) do
GenServer.call(__MODULE__, {:download_file, {url, path, progress_callback, payload, headers}}, :infinity)
def download_file(url, path, progress_callback, payload, headers, stream_fun) do
GenServer.call(__MODULE__, {:download_file, {url, path, progress_callback, payload, headers, stream_fun}}, :infinity)
end
@doc "Upload a file to FB storage."
@ -115,8 +115,8 @@ defmodule Farmbot.HTTP do
{:reply, res, state}
end
def handle_call({:download_file, {url, path, progress_callback, payload, headers}}, _from, %{adapter: adapter} = state) do
res = case @adapter.download_file(adapter, url, path, progress_callback, payload, headers) do
def handle_call({:download_file, {url, path, progress_callback, payload, headers, stream_fun}}, _from, %{adapter: adapter} = state) do
res = case @adapter.download_file(adapter, url, path, progress_callback, payload, headers, stream_fun) do
{:ok, _} = res -> res
{:error, _} = res -> res
end

View File

@ -19,15 +19,16 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do
GenServer.call(http, {:req, method, url, body, headers, opts}, :infinity)
end
def download_file(http, url, path, progress_callback, payload, headers) do
def download_file(http, url, path, progress_callback, payload, headers, stream_fun) do
case request(
http,
:get,
url,
payload,
headers,
file: path,
progress_callback: progress_callback
[file: path,
stream_fun: stream_fun,
progress_callback: progress_callback]
) do
{:ok, %Response{status_code: code}} when is_2xx(code) -> {:ok, path}
{:ok, %Response{} = resp} -> {:error, resp}
@ -100,6 +101,7 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do
:file,
:timeout,
:progress_callback,
:stream_fun,
:file_size
]
end
@ -185,7 +187,12 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do
if buffer.timeout, do: Process.cancel_timer(buffer.timeout)
timeout = Process.send_after(self(), {:timeout, ref}, 30000)
maybe_log_progress(buffer)
maybe_stream_to_file(buffer.file, buffer.status_code, chunk)
case buffer.stream_fun do
nil ->
maybe_stream_to_file(buffer.file, buffer.status_code, chunk)
fun when is_function(fun) ->
fun.(buffer.status_code, chunk)
end
HTTPoison.stream_next(%AsyncResponse{id: ref})
{
@ -229,12 +236,19 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do
case file do
filename when is_binary(filename) ->
# debug_log "Opening file: #{filename}"
File.rm(file)
:ok = File.touch(filename)
{:ok, fd} = :file.open(filename, [:write, :raw])
{fd, Keyword.merge(opts, stream_to: self(), async: :once)}
if Keyword.get(opts, :stream_fun) do
# If there is a stream function, don't open the file.
{nil, opts}
else
# If there is, delete the old file,
File.rm(file)
# Make sure it exists and is empty
:ok = File.touch(filename)
# Open the file
{:ok, fd} = :file.open(filename, [:write, :raw])
# Set opts.
{fd, Keyword.merge(opts, stream_to: self(), async: :once)}
end
_ ->
{nil, opts}
end
@ -252,7 +266,8 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do
defp maybe_close_file(fd), do: :file.close(fd)
defp maybe_log_progress(%Buffer{file: file, progress_callback: pcb})
when is_nil(file) or is_nil(pcb) do
when is_nil(file) or is_nil(pcb)
do
# debug_log "File (#{inspect file}) or progress callback: #{inspect pcb} are nil"
:ok
end
@ -317,6 +332,7 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do
data: "",
headers: nil,
status_code: nil,
stream_fun: Keyword.get(opts, :stream_fun, nil),
progress_callback: Keyword.fetch!(opts, :progress_callback),
request: {method, url, body, headers, opts}
}

View File

@ -0,0 +1,97 @@
defmodule Farmbot.System.Updates.FwupStream do
use GenServer
require Logger
@moduledoc false
def start_link() do
GenServer.start_link(__MODULE__, [])
end
def send_chunk(pid, chunk) do
GenServer.call(pid, {:send, chunk})
end
def init([]) do
fwup = System.find_executable("fwup")
devpath = Nerves.Runtime.KV.get("nerves_fw_devpath") || "/dev/mmcblk0"
task = "upgrade"
args = if supports_handshake(), do: ["--exit-handshake"], else: []
args = args ++ ["--apply", "--no-unmount", "-d", devpath, "--task", task]
port =
Port.open({:spawn_executable, fwup}, [
{:args, args},
:use_stdio,
:binary,
:exit_status
])
{:ok, %{port: port}}
end
def handle_call(_cmd, _from, %{port: nil} = state) do
# In the process of closing down, so just ignore these.
{:reply, :error, state}
end
def handle_call({:send, chunk}, _from, state) do
# Since fwup may be slower than ssh, we need to provide backpressure
# here. It's tricky since `Port.command/2` is the only way to send
# bytes to fwup synchronously, but it's possible for fwup to error
# out when it's sending. If fwup errors out, then we need to make
# sure that a message gets back to the user for what happened.
# `Port.command/2` exits on error (it will be an :epipe error).
# Therefore we start a new process to call `Port.command/2` while
# we continue to handle responses. We also trap_exit to get messages
# when the port the Task exit.
result =
try do
Port.command(state.port, chunk)
:ok
rescue
ArgumentError ->
Logger.info("Port.command ArgumentError race condition detected and handled")
:error
end
{:reply, result, state}
end
def handle_info({port, {:data, response}}, %{port: port} = state) do
_trimmed_response =
if String.contains?(response, "\x1a") do
# fwup says that it's going to exit by sending a CTRL+Z (0x1a)
# The CTRL+Z is the very last character that will ever be
# received over the port, so handshake by closing the port.
send(port, {self(), :close})
String.trim_trailing(response, "\x1a")
else
response
end
{:noreply, state}
end
def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
Logger.info("fwup exited with status #{status} without handshaking")
{:noreply, %{state | port: nil}}
end
def handle_info({port, :closed}, %{port: port} = state) do
Logger.info("fwup port was closed")
{:noreply, %{state | port: nil}}
end
defp supports_handshake() do
Version.match?(fwup_version(), "> 0.17.0")
end
defp fwup_version() do
{version_str, 0} = System.cmd("fwup", ["--version"])
version_str
|> String.trim()
|> Version.parse!()
end
end

View File

@ -116,13 +116,19 @@ defmodule Farmbot.System.Updates.SlackUpdater do
if match?(<< <<"farmbot-">>, @target, <<"-">>, _rest :: binary>>, name) do
Logger.warn(3, "Downloading and applying an image from slack!")
if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "os_auto_update") do
case Farmbot.HTTP.download_file(dl_url, Path.join(@data_path, name), nil, "", [{'Authorization', 'Bearer #{state.token}'}]) do
{:ok, path} ->
Nerves.Firmware.upgrade_and_finalize(path)
alias Farmbot.System.Updates.FwupStream
{:ok, stream} = FwupStream.start_link()
stream_fun = fn(http_status_code, chunk) ->
if http_status_code not in [301, 302, 303, 307, 308] do
FwupStream.send_chunk(stream, chunk)
end
end
case Farmbot.HTTP.download_file(dl_url, Path.join(@data_path, name), nil, "", [{'Authorization', 'Bearer #{state.token}'}], stream_fun) do
{:ok, _path} ->
Farmbot.System.reboot("Slack update.")
{:stop, :normal, %{state | updating: true}}
{:error, reason} ->
Logger.error 3 "Failed to download update file: #{inspect reason}"
Logger.error 3, "Failed to download update file: #{inspect reason}"
{:noreply, state}
end
else