From 549d78e6e1b9740bc415feab55cf370fe5d3c1fa Mon Sep 17 00:00:00 2001 From: Connor Rigby Date: Sun, 12 Nov 2017 16:52:04 -0800 Subject: [PATCH] stream to fwup instead of saving a file, and then writing it --- lib/farmbot/bot_state/bot_state.ex | 42 +++++++++ lib/farmbot/bot_state/job_progress.ex | 43 +++++++++ lib/farmbot/http/adapter.ex | 3 +- lib/farmbot/http/http.ex | 10 +-- lib/farmbot/http/httpoison_adapter.ex | 38 +++++--- lib/farmbot/system/updates/fwup_stream.ex | 97 +++++++++++++++++++++ lib/farmbot/system/updates/slack_updater.ex | 14 ++- 7 files changed, 226 insertions(+), 21 deletions(-) create mode 100644 lib/farmbot/bot_state/job_progress.ex create mode 100644 lib/farmbot/system/updates/fwup_stream.ex diff --git a/lib/farmbot/bot_state/bot_state.ex b/lib/farmbot/bot_state/bot_state.ex index 70cef9d3..504ec5fe 100644 --- a/lib/farmbot/bot_state/bot_state.ex +++ b/lib/farmbot/bot_state/bot_state.ex @@ -120,6 +120,42 @@ defmodule Farmbot.BotState do farmwares: %{} } + def download_progress_fun(name) do + alias Farmbot.BotState.JobProgress + fn(bytes, total) -> + {do_send, prog} = cond do + # if the total is complete spit out the bytes, and put a status of complete. + total == :complete -> + Logger.success 3, "#{name} complete." + {true, %JobProgress.Bytes{bytes: bytes, status: :complete}} + + # if we don't know the total just spit out the bytes. + total == nil -> + # debug_log "#{name} - #{bytes} bytes." + {rem(bytes, 10) == 0, %JobProgress.Bytes{bytes: bytes}} + # if the number of bytes == the total bytes, percentage side is complete. + (div(bytes,total)) == 1 -> + Logger.success 3, "#{name} complete." + {true, %JobProgress.Percent{percent: 100, status: :complete}} + # anything else is a percent. + true -> + percent = ((bytes / total) * 100) |> round() + # Logger.busy 3, "#{name} - #{bytes}/#{total} = #{percent}%" + {rem(percent, 10) == true, %JobProgress.Percent{percent: percent}} + end + if do_send do + Farmbot.BotState.set_job_progress(name, prog) + else + :ok + end + end + end + + @doc "Set job progress." + def set_job_progress(name, progress) do + GenServer.call(__MODULE__, {:set_job_progress, name, progress}) + end + @doc "Get a current pin value." def get_pin_value(num) do GenStage.call(__MODULE__, {:get_pin_value, num}) @@ -234,6 +270,12 @@ defmodule Farmbot.BotState do {:reply, state.location_data.position, [], state} end + def handle_call({:set_job_progress, name, progress}, _from, state) do + jobs = Map.put(state.jobs, name, progress) + new_state = %{state | jobs: jobs} + {:reply, :ok, [new_state], new_state} + end + def handle_call({:register_farmware, fw}, _, state) do ser_fw_meta = %{ min_os_version_major: fw.min_os_version_major, diff --git a/lib/farmbot/bot_state/job_progress.ex b/lib/farmbot/bot_state/job_progress.ex new file mode 100644 index 00000000..cc4d88a7 --- /dev/null +++ b/lib/farmbot/bot_state/job_progress.ex @@ -0,0 +1,43 @@ +defmodule Farmbot.BotState.JobProgress do + @moduledoc "Interface for job progress." + + @typedoc "Unit of the job." + @type unit :: :percent | :bytes + @typedoc "Status of the job." + @type status :: :working | :complete | :error + + defmodule Percent do + @moduledoc "Percent job." + defstruct [status: :working, percent: 0, unit: :percent] + + defimpl Inspect, for: __MODULE__ do + def inspect(%{percent: percent}, _) do + "#Percent<#{percent}>" + end + end + @type t :: %__MODULE__{ + status: Farmbot.BotState.JobProgress.status, + percent: integer, + unit: :percent + } + end + + defmodule Bytes do + @moduledoc "Byes job." + defstruct [status: :working, bytes: 0, unit: :bytes] + + defimpl Inspect, for: __MODULE__ do + def inspect(%{bytes: bytes}, _) do + "#bytes<#{bytes}>" + end + end + + @type t :: %__MODULE__{ + status: Farmbot.BotState.JobProgress.status, + bytes: integer, + unit: :bytes + } + end + + @type t :: Bytes.t | Percent.t +end diff --git a/lib/farmbot/http/adapter.ex b/lib/farmbot/http/adapter.ex index 1d9af2dc..d1aa242c 100644 --- a/lib/farmbot/http/adapter.ex +++ b/lib/farmbot/http/adapter.ex @@ -26,6 +26,7 @@ defmodule Farmbot.HTTP.Adapter do Arg 2 should be the total number of bytes, nil, or the atom :complete """ @type progress_callback :: (number, number | nil | :complete -> any) + @type stream_fun :: (number, binary -> any) @typedoc "A json serializable map of meta data about an upload." @type upload_meta :: map @@ -35,7 +36,7 @@ defmodule Farmbot.HTTP.Adapter do {:ok, Response.t()} | {:error, term} @doc "Download a file to the Filesystem." - @callback download_file(adapter, url, Path.t(), progress_callback, body, headers) :: + @callback download_file(adapter, url, Path.t(), progress_callback, body, headers, stream_fun) :: {:ok, Path.t()} | {:error, term} @doc "Upload a file." diff --git a/lib/farmbot/http/http.ex b/lib/farmbot/http/http.ex index 34f373fb..1bb147ff 100644 --- a/lib/farmbot/http/http.ex +++ b/lib/farmbot/http/http.ex @@ -81,10 +81,10 @@ defmodule Farmbot.HTTP do end @doc "Download a file to the filesystem." - def download_file(url, path, progress_callback \\ nil, payload \\ "", headers \\ []) + def download_file(url, path, progress_callback \\ nil, payload \\ "", headers \\ [], stream_fun \\ nil) - def download_file(url, path, progress_callback, payload, headers) do - GenServer.call(__MODULE__, {:download_file, {url, path, progress_callback, payload, headers}}, :infinity) + def download_file(url, path, progress_callback, payload, headers, stream_fun) do + GenServer.call(__MODULE__, {:download_file, {url, path, progress_callback, payload, headers, stream_fun}}, :infinity) end @doc "Upload a file to FB storage." @@ -115,8 +115,8 @@ defmodule Farmbot.HTTP do {:reply, res, state} end - def handle_call({:download_file, {url, path, progress_callback, payload, headers}}, _from, %{adapter: adapter} = state) do - res = case @adapter.download_file(adapter, url, path, progress_callback, payload, headers) do + def handle_call({:download_file, {url, path, progress_callback, payload, headers, stream_fun}}, _from, %{adapter: adapter} = state) do + res = case @adapter.download_file(adapter, url, path, progress_callback, payload, headers, stream_fun) do {:ok, _} = res -> res {:error, _} = res -> res end diff --git a/lib/farmbot/http/httpoison_adapter.ex b/lib/farmbot/http/httpoison_adapter.ex index 959031b8..23cb6272 100644 --- a/lib/farmbot/http/httpoison_adapter.ex +++ b/lib/farmbot/http/httpoison_adapter.ex @@ -19,15 +19,16 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do GenServer.call(http, {:req, method, url, body, headers, opts}, :infinity) end - def download_file(http, url, path, progress_callback, payload, headers) do + def download_file(http, url, path, progress_callback, payload, headers, stream_fun) do case request( http, :get, url, payload, headers, - file: path, - progress_callback: progress_callback + [file: path, + stream_fun: stream_fun, + progress_callback: progress_callback] ) do {:ok, %Response{status_code: code}} when is_2xx(code) -> {:ok, path} {:ok, %Response{} = resp} -> {:error, resp} @@ -100,6 +101,7 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do :file, :timeout, :progress_callback, + :stream_fun, :file_size ] end @@ -185,7 +187,12 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do if buffer.timeout, do: Process.cancel_timer(buffer.timeout) timeout = Process.send_after(self(), {:timeout, ref}, 30000) maybe_log_progress(buffer) - maybe_stream_to_file(buffer.file, buffer.status_code, chunk) + case buffer.stream_fun do + nil -> + maybe_stream_to_file(buffer.file, buffer.status_code, chunk) + fun when is_function(fun) -> + fun.(buffer.status_code, chunk) + end HTTPoison.stream_next(%AsyncResponse{id: ref}) { @@ -229,12 +236,19 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do case file do filename when is_binary(filename) -> - # debug_log "Opening file: #{filename}" - File.rm(file) - :ok = File.touch(filename) - {:ok, fd} = :file.open(filename, [:write, :raw]) - {fd, Keyword.merge(opts, stream_to: self(), async: :once)} - + if Keyword.get(opts, :stream_fun) do + # If there is a stream function, don't open the file. + {nil, opts} + else + # If there is, delete the old file, + File.rm(file) + # Make sure it exists and is empty + :ok = File.touch(filename) + # Open the file + {:ok, fd} = :file.open(filename, [:write, :raw]) + # Set opts. + {fd, Keyword.merge(opts, stream_to: self(), async: :once)} + end _ -> {nil, opts} end @@ -252,7 +266,8 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do defp maybe_close_file(fd), do: :file.close(fd) defp maybe_log_progress(%Buffer{file: file, progress_callback: pcb}) - when is_nil(file) or is_nil(pcb) do + when is_nil(file) or is_nil(pcb) + do # debug_log "File (#{inspect file}) or progress callback: #{inspect pcb} are nil" :ok end @@ -317,6 +332,7 @@ defmodule Farmbot.HTTP.HTTPoisonAdapter do data: "", headers: nil, status_code: nil, + stream_fun: Keyword.get(opts, :stream_fun, nil), progress_callback: Keyword.fetch!(opts, :progress_callback), request: {method, url, body, headers, opts} } diff --git a/lib/farmbot/system/updates/fwup_stream.ex b/lib/farmbot/system/updates/fwup_stream.ex new file mode 100644 index 00000000..4a3e4004 --- /dev/null +++ b/lib/farmbot/system/updates/fwup_stream.ex @@ -0,0 +1,97 @@ +defmodule Farmbot.System.Updates.FwupStream do + use GenServer + require Logger + + @moduledoc false + + def start_link() do + GenServer.start_link(__MODULE__, []) + end + + def send_chunk(pid, chunk) do + GenServer.call(pid, {:send, chunk}) + end + + def init([]) do + fwup = System.find_executable("fwup") + devpath = Nerves.Runtime.KV.get("nerves_fw_devpath") || "/dev/mmcblk0" + task = "upgrade" + + args = if supports_handshake(), do: ["--exit-handshake"], else: [] + args = args ++ ["--apply", "--no-unmount", "-d", devpath, "--task", task] + + port = + Port.open({:spawn_executable, fwup}, [ + {:args, args}, + :use_stdio, + :binary, + :exit_status + ]) + + {:ok, %{port: port}} + end + + def handle_call(_cmd, _from, %{port: nil} = state) do + # In the process of closing down, so just ignore these. + {:reply, :error, state} + end + + def handle_call({:send, chunk}, _from, state) do + # Since fwup may be slower than ssh, we need to provide backpressure + # here. It's tricky since `Port.command/2` is the only way to send + # bytes to fwup synchronously, but it's possible for fwup to error + # out when it's sending. If fwup errors out, then we need to make + # sure that a message gets back to the user for what happened. + # `Port.command/2` exits on error (it will be an :epipe error). + # Therefore we start a new process to call `Port.command/2` while + # we continue to handle responses. We also trap_exit to get messages + # when the port the Task exit. + result = + try do + Port.command(state.port, chunk) + :ok + rescue + ArgumentError -> + Logger.info("Port.command ArgumentError race condition detected and handled") + :error + end + + {:reply, result, state} + end + + def handle_info({port, {:data, response}}, %{port: port} = state) do + _trimmed_response = + if String.contains?(response, "\x1a") do + # fwup says that it's going to exit by sending a CTRL+Z (0x1a) + # The CTRL+Z is the very last character that will ever be + # received over the port, so handshake by closing the port. + send(port, {self(), :close}) + String.trim_trailing(response, "\x1a") + else + response + end + + {:noreply, state} + end + + def handle_info({port, {:exit_status, status}}, %{port: port} = state) do + Logger.info("fwup exited with status #{status} without handshaking") + {:noreply, %{state | port: nil}} + end + + def handle_info({port, :closed}, %{port: port} = state) do + Logger.info("fwup port was closed") + {:noreply, %{state | port: nil}} + end + + defp supports_handshake() do + Version.match?(fwup_version(), "> 0.17.0") + end + + defp fwup_version() do + {version_str, 0} = System.cmd("fwup", ["--version"]) + version_str + |> String.trim() + |> Version.parse!() + end +end diff --git a/lib/farmbot/system/updates/slack_updater.ex b/lib/farmbot/system/updates/slack_updater.ex index 5664a91f..4e101ebc 100644 --- a/lib/farmbot/system/updates/slack_updater.ex +++ b/lib/farmbot/system/updates/slack_updater.ex @@ -116,13 +116,19 @@ defmodule Farmbot.System.Updates.SlackUpdater do if match?(<< <<"farmbot-">>, @target, <<"-">>, _rest :: binary>>, name) do Logger.warn(3, "Downloading and applying an image from slack!") if Farmbot.System.ConfigStorage.get_config_value(:bool, "settings", "os_auto_update") do - case Farmbot.HTTP.download_file(dl_url, Path.join(@data_path, name), nil, "", [{'Authorization', 'Bearer #{state.token}'}]) do - {:ok, path} -> - Nerves.Firmware.upgrade_and_finalize(path) + alias Farmbot.System.Updates.FwupStream + {:ok, stream} = FwupStream.start_link() + stream_fun = fn(http_status_code, chunk) -> + if http_status_code not in [301, 302, 303, 307, 308] do + FwupStream.send_chunk(stream, chunk) + end + end + case Farmbot.HTTP.download_file(dl_url, Path.join(@data_path, name), nil, "", [{'Authorization', 'Bearer #{state.token}'}], stream_fun) do + {:ok, _path} -> Farmbot.System.reboot("Slack update.") {:stop, :normal, %{state | updating: true}} {:error, reason} -> - Logger.error 3 "Failed to download update file: #{inspect reason}" + Logger.error 3, "Failed to download update file: #{inspect reason}" {:noreply, state} end else