lazy fix for redis client.

pull/268/head
connor rigby 2017-03-07 08:23:53 -08:00
parent 084649abcf
commit 8f97f66c08
7 changed files with 101 additions and 126 deletions

View File

@ -25,6 +25,8 @@ defmodule Farmware.FarmScript do
@enforce_keys [:executable, :args, :path, :name, :envs]
defstruct [:executable, :args, :path, :name, :envs]
@timeout 20_000
require Logger
@doc """
@ -94,7 +96,7 @@ defmodule Farmware.FarmScript do
_something -> handle_port(port, thing)
after
10_000 ->
@timeout ->
Logger.error ">> [#{thing.name}] Timed out"
kill_port(port)
end

View File

@ -1,118 +0,0 @@
defmodule Redis.Client do
@moduledoc """
Command line redis client
"""
use GenServer
require Logger
# 15 minutes
@save_time 900_000
@port Application.get_env(:farmbot, :redis_port)
@config Application.get_all_env(:farmbot)[:redis]
@doc """
Start the redis server.
"""
def start_link, do: GenServer.start_link(__MODULE__, [])
def init([]) do
exe = System.find_executable("redis-cli")
port = Port.open({:spawn_executable, exe},
[:stream,
:binary,
:exit_status,
:hide,
:use_stdio,
:stderr_to_stdout,
args: ["-p", "#{@config[:port]}"]])
Process.send_after(self(), :save, @save_time)
{:ok, %{cli: port, queue: :queue.new(), blah: nil}}
end
def handle_info({_cli, {:data, info}}, state) do
info = String.trim(info)
if state.blah do GenServer.reply(state.blah, info) end
{:noreply, %{state | blah: nil}}
end
def handle_info(:save, state) do
# Since this is a function that doesnt get executed right now, we can
# have this GenServer call itself tehe
me = self()
Farmbot.System.FS.transaction fn() -> Redis.Client.send_redis(me, "SAVE") end
# send ourselves a message in x seconds
Process.send_after(self(), :save, @save_time)
{:noreply, state}
end
def handle_call({:send, str}, from, state) do
Port.command(state.cli, str <> "\n")
{:noreply, %{state | blah: from}}
end
@doc """
Sends a command to redis client. This is blocking.
"""
@spec send_redis(pid, binary) :: binary
def send_redis(pid, str), do: GenServer.call(pid, {:send, str})
@doc """
Input a value by a given key.
"""
@spec input_value(pid, String.t, any) :: [String.t]
def input_value(redis, key, value) when is_map(value) do
input_map(redis, %{key => value})
end
def input_value(redis, key, value) when is_list(value) do
input_list(redis, key, value)
end
def input_value(redis, key, value) when is_tuple(value) do
input_value(redis, key, Tuple.to_list(value))
end
def input_value(redis, key, value), do: send_redis redis, "SET #{key} #{value}"
defp input_list(redis, key, list) do
send_redis(redis, "DEL #{key}")
rlist = Enum.reduce(list, "", fn(item, acc) ->
if is_binary(item) || is_integer(item),
do: acc <> " " <> "#{item}", else: acc
end)
send_redis(redis, "RPUSH #{key} #{rlist}")
end
@spec input_map(pid, map | struct, String.t | nil) :: [String.t]
defp input_map(redis, map, bloop \\ nil)
defp input_map(redis, %{__struct__: _} = map, bloop),
do: input_map(redis, map |> Map.from_struct, bloop)
@lint false
defp input_map(redis, map, bloop) when is_map(map) do
Enum.map(map, fn({key, value}) ->
cond do
is_map(value) ->
if bloop do
input_map(redis, value, "#{bloop}.#{key}")
else
input_map(redis, value, key)
end
is_list(value) ->
if bloop do
input_list(redis, "#{bloop}.#{key}", value)
else
input_list(redis, key, value)
end
true ->
if bloop do
input_value(redis, "#{bloop}.#{key}", value)
else
input_value(redis, key, value)
end
end
end)
|> List.flatten
end
end

View File

@ -0,0 +1,69 @@
defmodule Redis.Client.Public do
@doc """
Sends a command to redis. Blocks
"""
@spec send_redis(pid, [binary]) :: binary
def send_redis(conn, stuff), do: Redix.command!(conn, stuff)
@doc """
Input a value by a given key.
"""
@spec input_value(pid, String.t, any) :: [String.t]
def input_value(redis, key, value) when is_map(value) do
input_map(redis, %{key => value})
end
def input_value(redis, key, value) when is_list(value) do
input_list(redis, key, value)
end
def input_value(redis, key, value) when is_tuple(value) do
input_value(redis, key, Tuple.to_list(value))
end
def input_value(redis, key, value),
do: send_redis redis, ["SET", key, value]
defp input_list(redis, key, list) do
send_redis redis, ["DEL", key]
for i <- list do
if is_binary(i) or is_integer(i) do
send_redis redis, ["RPUSH", key, i]
end
end
end
@spec input_map(pid, map | struct, String.t | nil) :: [String.t]
defp input_map(redis, map, bloop \\ nil)
defp input_map(redis, %{__struct__: _} = map, bloop),
do: input_map(redis, map |> Map.from_struct, bloop)
@lint false
defp input_map(redis, map, bloop) when is_map(map) do
Enum.map(map, fn({key, value}) ->
cond do
is_map(value) ->
if bloop do
input_map(redis, value, "#{bloop}.#{key}")
else
input_map(redis, value, key)
end
is_list(value) ->
if bloop do
input_list(redis, "#{bloop}.#{key}", value)
else
input_list(redis, key, value)
end
true ->
if bloop do
input_value(redis, "#{bloop}.#{key}", value)
else
input_value(redis, key, value)
end
end
end)
|> List.flatten
end
end

View File

@ -17,8 +17,8 @@ defmodule Redis.Server do
tcp-backlog 511
unixsocket /tmp/redis.sock
unixsocketperm 700
timeout 60
tcp-keepalive 300
timeout 0
tcp-keepalive 0
supervised no
pidfile /var/run/redis_6379.pid
loglevel notice

View File

@ -4,6 +4,9 @@ defmodule Farmbot.Transport.Redis do
"""
use GenStage
require Logger
@config Application.get_all_env(:farmbot)[:redis]
@ping_time 5_000
@save_time 90_0000
@doc """
Starts a stage for a Redis
@ -12,16 +15,31 @@ defmodule Farmbot.Transport.Redis do
def start_link, do: GenStage.start_link(__MODULE__, [], name: __MODULE__)
def init(_) do
{:ok, redis} = Redis.Client.start_link
Process.link(redis)
{:consumer, redis, subscribe_to: [Farmbot.Transport]}
{:ok, conn} = Redix.start_link(host: "localhost", port: @config[:port])
Process.link(conn)
Process.send_after(self(), :ping, @ping_time)
{:consumer, conn, subscribe_to: [Farmbot.Transport]}
end
def handle_info({_from, {:status, stuff}}, redis) do
Redis.Client.input_value(redis, "BOT_STATUS", stuff)
Redis.Client.Public.input_value(redis, "BOT_STATUS", stuff)
{:noreply, [], redis}
end
def handle_info(:ping, conn) do
Redis.Client.Public.send_redis(conn, ["PING"])
Process.send_after(self(), :ping, @ping_time)
{:noreply, [], conn}
end
def handle_info(:save, conn) do
Farmbot.System.FS.transaction fn() ->
Redis.Client.Public.send_redis(conn, ["SAVE"])
end
Process.send_after(self(), :save, @save_time)
{:noreply, [], conn}
end
def handle_info(_event, redis) do
# IO.inspect event
{:noreply, [], redis}

View File

@ -88,7 +88,8 @@ defmodule Farmbot.Mixfile do
:cowboy,
:quantum, # Quantum needs to start AFTER farmbot, so we can set up its dirs
:timex, # Timex needs to start AFTER farmbot, so we can set up its dirs,
:inets
:inets,
:redix
]
end
@ -125,6 +126,7 @@ defmodule Farmbot.Mixfile do
# Database
{:amnesia, github: "meh/amnesia"}, # database implementation
{:redix, ">= 0.0.0"},
# Log to syslog
{:ex_syslogger, "~> 1.3.3", only: :prod},

View File

@ -3,6 +3,7 @@
"calendar": {:hex, :calendar, "0.17.1", "5c7dfffde2b68011c2d6832ff1a15496292de965a3b57b3fad32405f1176f024", [:mix], [{:tzdata, "~> 0.5.8 or ~> 0.1.201603", [hex: :tzdata, optional: false]}]},
"certifi": {:hex, :certifi, "1.0.0", "1c787a85b1855ba354f0b8920392c19aa1d06b0ee1362f9141279620a5be2039", [:rebar3], []},
"combine": {:hex, :combine, "0.9.6", "8d1034a127d4cbf6924c8a5010d3534d958085575fa4d9b878f200d79ac78335", [:mix], []},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []},
"cors_plug": {:hex, :cors_plug, "1.1.4", "b4c2f0de641700cd3168b90223d12dd5a3e9c54804c0c2451abe1b292620b375", [:mix], [{:cowboy, "~> 1.0.0", [hex: :cowboy, optional: false]}, {:plug, "> 0.8.0", [hex: :plug, optional: false]}]},
"cowboy": {:hex, :cowboy, "1.0.4", "a324a8df9f2316c833a470d918aaf73ae894278b8aa6226ce7a9bf699388f878", [:make, :rebar], [{:cowlib, "~> 1.0.0", [hex: :cowlib, optional: false]}, {:ranch, "~> 1.0", [hex: :ranch, optional: false]}]},
"cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], []},
@ -53,6 +54,7 @@
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []},
"quantum": {:hex, :quantum, "1.8.1", "37c9ad0307cf47bd578507ce1ddda98746199b281e8afe91cbe44c21d56af983", [:mix], [{:calendar, "~> 0.16", [hex: :calendar, optional: false]}]},
"ranch": {:hex, :ranch, "1.2.1", "a6fb992c10f2187b46ffd17ce398ddf8a54f691b81768f9ef5f461ea7e28c762", [:make], []},
"redix": {:hex, :redix, "0.5.1", "2bf874a186cc759791b8defdd0bfaa752784716bd48241f6aa972a20e7f95745", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}]},
"rsa": {:hex, :rsa, "0.0.1", "a63069f88ce342ffdf8448b7cdef4b39ba7dee3c1510644a39385c7e63ba246f", [:mix], []},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []},
"syslog": {:hex, :syslog, "1.0.2", "9cf72c0986675a170c03b210e49700845a0f7b61e96d302a3ba0df82963daf60", [:rebar], []},