Merge branch 'staging' of github.com:FarmBot/Farmbot-Web-App into staging
commit
9d8308d6f7
|
@ -57,6 +57,7 @@ module Api
|
|||
status
|
||||
status_v8
|
||||
sync
|
||||
telemetry
|
||||
\\#
|
||||
\\*
|
||||
).map { |x| x + "(\\.|\\z)" }.join("|")
|
||||
|
|
|
@ -7,7 +7,7 @@ class AmqpLogParser < Mutations::Command
|
|||
TOO_OLD = "fbos version is out of date"
|
||||
DISCARD = "message type field is not the kind that gets saved in the DB"
|
||||
NOT_HASH = "logs must be a hash"
|
||||
|
||||
NOT_JSON = "Invalid JSON. Use a JSON validator."
|
||||
# I keep a Ruby copy of the JSON here for reference.
|
||||
# This is what a log will look like after JSON.parse()
|
||||
EXAMPLE_JSON = {
|
||||
|
@ -75,6 +75,8 @@ class AmqpLogParser < Mutations::Command
|
|||
def set_payload!
|
||||
# Parse from string to a Ruby hash (JSON)
|
||||
@output.payload = JSON.parse(payload)
|
||||
rescue JSON::ParserError
|
||||
add_error :json, :not_json, NOT_JSON
|
||||
end
|
||||
|
||||
def log
|
||||
|
|
|
@ -24,10 +24,12 @@ class LogService < AbstractServiceRunner
|
|||
|
||||
def maybe_deliver(data)
|
||||
violation = THROTTLE_POLICY.is_throttled(data.device_id)
|
||||
ok = data.valid? && !violation
|
||||
if violation
|
||||
return warn_user(data, violation)
|
||||
end
|
||||
|
||||
data.device.auto_sync_transaction do
|
||||
ok ? deliver(data) : warn_user(data, violation)
|
||||
deliver(data)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -41,6 +43,6 @@ class LogService < AbstractServiceRunner
|
|||
end
|
||||
|
||||
def warn_user(data, violation)
|
||||
data.device.maybe_throttle(violation)
|
||||
violation && data.device.maybe_throttle(violation)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
# A singleton that runs on a separate process than the web server.
|
||||
# Listens to *ALL* incoming logs and stores them to the DB.
|
||||
# Also handles throttling.
|
||||
class TelemetryService < AbstractServiceRunner
|
||||
MESSAGE = "TELEMETRY MESSAGE FROM %s"
|
||||
FAILURE = "FAILED TELEMETRY MESSAGE FROM %s"
|
||||
|
||||
def process(delivery_info, payload)
|
||||
device_key = delivery_info
|
||||
.routing_key
|
||||
.split(".")[1]
|
||||
json = JSON.parse(payload)
|
||||
other_stuff = { device: device_key,
|
||||
is_telemetry: true,
|
||||
message: MESSAGE % device_key }
|
||||
puts json.merge(other_stuff).to_json
|
||||
rescue JSON::ParserError
|
||||
puts ({ device: device_key,
|
||||
is_telemetry: true,
|
||||
bad_json: payload,
|
||||
message: FAILURE % device_key }).to_json
|
||||
end
|
||||
end
|
|
@ -19,9 +19,9 @@ class ThrottlePolicy
|
|||
def is_throttled(unique_id)
|
||||
rules
|
||||
.map do |rule|
|
||||
is_violation = rule.time_period.usage_count_for(unique_id) > rule.limit
|
||||
is_violation ? Violation.new(rule) : nil
|
||||
end
|
||||
is_violation = rule.time_period.usage_count_for(unique_id) > rule.limit
|
||||
is_violation ? Violation.new(rule) : nil
|
||||
end
|
||||
.compact
|
||||
.max
|
||||
end
|
||||
|
|
|
@ -114,9 +114,10 @@ class Device < ApplicationRecord
|
|||
# Sets the `throttled_until` and `throttled_at` fields if unpopulated or
|
||||
# the throttle time period increases. Notifies user of cooldown period.
|
||||
def maybe_throttle(violation)
|
||||
return unless violation
|
||||
end_t = violation.ends_at
|
||||
# Some log validation errors will result in until_time being `nil`.
|
||||
if (violation && (throttled_until.nil? || end_t > throttled_until))
|
||||
if (throttled_until.nil? || end_t > throttled_until)
|
||||
reload.update_attributes!(throttled_until: end_t,
|
||||
throttled_at: Time.now)
|
||||
refresh_cache
|
||||
|
|
|
@ -42,6 +42,14 @@ class Transport
|
|||
.bind("amq.topic", routing_key: "bot.*.logs")
|
||||
end
|
||||
|
||||
def telemetry_channel
|
||||
@telemetry_channel ||= self
|
||||
.connection
|
||||
.create_channel
|
||||
.queue("api_telemetry_workers")
|
||||
.bind("amq.topic", routing_key: "bot.*.telemetry")
|
||||
end
|
||||
|
||||
def resource_channel
|
||||
@resource_channel ||= self
|
||||
.connection
|
||||
|
|
|
@ -24,6 +24,7 @@ class RabbitWorker
|
|||
|
||||
loop do
|
||||
ThreadsWait.all_waits([
|
||||
thread { TelemetryService.new.go!(t.telemetry_channel) },
|
||||
thread { LogService.new.go!(t.log_channel) },
|
||||
thread { Resources::Service.new.go!(t.resource_channel) },
|
||||
])
|
||||
|
|
|
@ -205,6 +205,8 @@ describe Api::RmqUtilsController do
|
|||
".status.*",
|
||||
".status",
|
||||
".sync.*",
|
||||
".telemetry.*",
|
||||
".telemetry",
|
||||
".sync",
|
||||
".status_v8.*",
|
||||
".status_v8"].map { |x| expect(random_channel(x).match(r)).to be }
|
||||
|
|
|
@ -13,8 +13,6 @@ describe LogService do
|
|||
channels: [],
|
||||
}.to_json
|
||||
|
||||
FakeDeliveryInfo = Struct.new(:routing_key, :device)
|
||||
|
||||
let!(:device) { FactoryBot.create(:device) }
|
||||
let!(:device_id) { device.id }
|
||||
let!(:fake_delivery_info) do
|
||||
|
@ -31,6 +29,12 @@ describe LogService do
|
|||
expect(calls).to include(["amq.topic", { routing_key: "bot.*.logs" }])
|
||||
end
|
||||
|
||||
it "has a telemetry_channel" do
|
||||
calls = Transport.current.telemetry_channel.calls[:bind]
|
||||
call = ["amq.topic", { :routing_key => "bot.*.telemetry" }]
|
||||
expect(calls).to include(call)
|
||||
end
|
||||
|
||||
it "has a resource_channel" do
|
||||
calls = Transport.current.resource_channel.calls[:bind]
|
||||
expect(calls).to include([
|
||||
|
@ -54,9 +58,29 @@ describe LogService do
|
|||
LogService.new.warn_user(data, time)
|
||||
end
|
||||
|
||||
it "triggers a throttle" do
|
||||
tp = LogService::THROTTLE_POLICY
|
||||
ls = LogService.new
|
||||
data = AmqpLogParser::DeliveryInfo.new
|
||||
data.device_id = FactoryBot.create(:device).id
|
||||
violation = ThrottlePolicy::Violation.new(Object.new)
|
||||
allow(ls).to receive(:deliver)
|
||||
expect(ls).to receive(:warn_user)
|
||||
expect(tp).to receive(:is_throttled)
|
||||
.with(data.device_id)
|
||||
.and_return(violation)
|
||||
ls.maybe_deliver(data)
|
||||
end
|
||||
|
||||
it "handles bad params" do
|
||||
expect do
|
||||
LogService.new.process(fake_delivery_info, {})
|
||||
end.to raise_error(Mutations::ValidationException)
|
||||
end
|
||||
|
||||
it "handles malformed params" do
|
||||
expect do
|
||||
LogService.new.process(fake_delivery_info, "}}{{")
|
||||
end.to raise_error(Mutations::ValidationException)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
require "spec_helper"
|
||||
|
||||
describe TelemetryService do
|
||||
it "handles malformed JSON" do
|
||||
ts = TelemetryService.new
|
||||
routing_key = "bot.device_123.telemetry"
|
||||
payload = "}"
|
||||
expected = "{\"device\":\"device_123\"," \
|
||||
"\"is_telemetry\":true,\"bad_json\":\"}\"," \
|
||||
"\"message\":\"FAILED TELEMETRY MESSAGE " \
|
||||
"FROM device_123\"}\n"
|
||||
delivery_info =
|
||||
FakeDeliveryInfo.new(routing_key, payload)
|
||||
expect do
|
||||
ts.process(delivery_info, payload)
|
||||
end.to output(expected).to_stdout
|
||||
end
|
||||
|
||||
it "parses telemetry from the device" do
|
||||
ts = TelemetryService.new
|
||||
routing_key = "bot.device_123.telemetry"
|
||||
payload = {
|
||||
foo: "bar",
|
||||
# I'm putting this key here to make sure
|
||||
# bots cannot change their `device_id` /
|
||||
# spoof teleemetry of other bots.
|
||||
device: "device_456",
|
||||
}.to_json
|
||||
expected = [
|
||||
"{\"foo\":\"bar\"," \
|
||||
"\"device\":\"device_123\"," \
|
||||
"\"is_telemetry\":true," \
|
||||
"\"message\":\"TELEMETRY MESSAGE " \
|
||||
"FROM device_123\"}\n",
|
||||
].join("")
|
||||
delivery_info =
|
||||
FakeDeliveryInfo.new(routing_key, payload)
|
||||
expect do
|
||||
ts.process(delivery_info, payload)
|
||||
end.to output(expected).to_stdout
|
||||
end
|
||||
end
|
|
@ -171,3 +171,5 @@ class NiceResponse
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
FakeDeliveryInfo = Struct.new(:routing_key, :device)
|
||||
|
|
Loading…
Reference in New Issue