commit
3958c27dae
|
@ -1,23 +1,17 @@
|
|||
#
|
||||
# Handles devices that spin out of control and send too many logs to the server.
|
||||
# Class Hierarchy:
|
||||
# ThrotllePolicy has => Rules creates => Violation
|
||||
# Violation has => Rule has => TimePeriod
|
||||
class ThrottlePolicy
|
||||
attr_reader :rules
|
||||
|
||||
# A throttler object paired with a max limit
|
||||
class Rule
|
||||
attr_reader :throttler, :limit
|
||||
|
||||
def initialize(throttler, limit)
|
||||
@throttler, @limit = throttler, limit
|
||||
end
|
||||
end
|
||||
|
||||
# Dictionary<Throttler, Intger>
|
||||
# Dictionary<TimePeriod, Intger>
|
||||
def initialize(policy_rules)
|
||||
@rules = policy_rules.map { |rule_set| Rule.new(*rule_set) }
|
||||
end
|
||||
|
||||
def track(unique_id, now = Time.now)
|
||||
rules.each { |r| r.throttler.record_event(unique_id, now) }
|
||||
rules.each { |r| r.time_period.record_event(unique_id, now) }
|
||||
end
|
||||
|
||||
# If throttled, returns the timeperiod when device will be unthrottled
|
||||
|
@ -25,8 +19,8 @@ class ThrottlePolicy
|
|||
def is_throttled(unique_id)
|
||||
rules
|
||||
.map do |rule|
|
||||
is_over = rule.throttler.usage_count_for(unique_id) > rule.limit
|
||||
is_over ? rule.throttler.when_does_next_period_start? : nil
|
||||
is_violation = rule.time_period.usage_count_for(unique_id) > rule.limit
|
||||
is_violation ? Violation.new(rule) : nil
|
||||
end
|
||||
.compact
|
||||
.max
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
class ThrottlePolicy
|
||||
# A time_period object paired with a max limit
|
||||
class Rule
|
||||
attr_reader :time_period, :limit
|
||||
|
||||
def initialize(time_period, limit)
|
||||
@time_period, @limit = time_period, limit
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,58 @@
|
|||
# Track the number of occurences of an event over time.
|
||||
#
|
||||
# Given:
|
||||
# * A fixed size duration (1 minute, 1 week etc)
|
||||
# * An event (In our case, log creation)
|
||||
# * An initiator id (eg: device_id)
|
||||
#
|
||||
# Produces:
|
||||
# * A table of event counts for the current time period, indexed by
|
||||
# the initiator ID.
|
||||
class ThrottlePolicy
|
||||
class TimePeriod
|
||||
attr_reader :time_unit,
|
||||
:current_period, # Slice time into fixed size windows
|
||||
:entries
|
||||
|
||||
def initialize(active_support_duration, now = Time.now)
|
||||
@time_unit = active_support_duration
|
||||
reset_everything now
|
||||
end
|
||||
|
||||
|
||||
def record_event(unique_id, now = Time.now)
|
||||
period = calculate_period(now)
|
||||
case period <=> current_period
|
||||
when -1 then return # Out of date- don't record.
|
||||
when 0 then increment_count_for(unique_id) # Right on schedule.
|
||||
when 1 then reset_everything(now) # Clear out old data.
|
||||
end
|
||||
end
|
||||
|
||||
def usage_count_for(unique_id)
|
||||
@entries[unique_id] || 0
|
||||
end
|
||||
|
||||
def when_does_next_period_start?
|
||||
Time.at(current_period * time_unit.to_i) + time_unit
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def reset_everything(now)
|
||||
@current_period = calculate_period(now)
|
||||
@entries = {}
|
||||
end
|
||||
|
||||
def increment_count_for(unique_id)
|
||||
@entries[unique_id] ||= 0
|
||||
@entries[unique_id] += 1
|
||||
end
|
||||
|
||||
# Returns integer representation of current clock period
|
||||
def calculate_period(time)
|
||||
(time.to_i / @time_unit)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
|
@ -0,0 +1,29 @@
|
|||
class ThrottlePolicy
|
||||
class Violation
|
||||
attr_reader :rule
|
||||
|
||||
def initialize(rule)
|
||||
@rule = rule
|
||||
end
|
||||
|
||||
def ends_at
|
||||
@rule.time_period.when_does_next_period_start?
|
||||
end
|
||||
|
||||
def <=>(other)
|
||||
self.timeframe <=> other.timeframe
|
||||
end
|
||||
|
||||
def timeframe
|
||||
rule.time_period.time_unit
|
||||
end
|
||||
|
||||
def limit
|
||||
rule.limit.to_i
|
||||
end
|
||||
|
||||
def explanation
|
||||
"more than #{limit} logs in #{timeframe.inspect}"
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,56 +0,0 @@
|
|||
# Determine the number of occurences of an event over time.
|
||||
#
|
||||
# Given:
|
||||
# * A fixed size time period (1 minute, 1 week etc)
|
||||
# * An event (In our case, log creation)
|
||||
# * An initiator id (eg: device_id)
|
||||
#
|
||||
# Produces:
|
||||
# * A table of event counts for the current time period, indexed by
|
||||
# the initiator ID.
|
||||
class Throttler
|
||||
attr_reader :time_unit_in_seconds,
|
||||
:current_period, # Slice time into fixed size windows
|
||||
:entries
|
||||
|
||||
def initialize(unit_size_in_seconds, now = Time.now)
|
||||
@time_unit_in_seconds = unit_size_in_seconds
|
||||
reset_everything now
|
||||
end
|
||||
|
||||
|
||||
def record_event(unique_id, now = Time.now)
|
||||
period = calculate_period(now)
|
||||
case period <=> current_period
|
||||
when -1 then return # Out of date- don't record.
|
||||
when 0 then increment_count_for(unique_id) # Right on schedule.
|
||||
when 1 then reset_everything(now) # Clear out old data.
|
||||
end
|
||||
end
|
||||
|
||||
def usage_count_for(unique_id)
|
||||
@entries[unique_id] || 0
|
||||
end
|
||||
|
||||
def when_does_next_period_start?
|
||||
Time.at(current_period * time_unit_in_seconds.to_i) + time_unit_in_seconds
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def reset_everything(now)
|
||||
@current_period = calculate_period(now)
|
||||
@entries = {}
|
||||
end
|
||||
|
||||
def increment_count_for(unique_id)
|
||||
@entries[unique_id] ||= 0
|
||||
@entries[unique_id] += 1
|
||||
end
|
||||
|
||||
# Returns integer representation of current clock period
|
||||
def calculate_period(time)
|
||||
(time.to_i / @time_unit_in_seconds)
|
||||
end
|
||||
|
||||
end
|
|
@ -44,6 +44,7 @@ class ApplicationRecord < ActiveRecord::Base
|
|||
serializer = ActiveModel::Serializer.serializer_for(self)
|
||||
return (serializer ? serializer.new(self) : self).as_json
|
||||
end
|
||||
|
||||
def broadcast_payload
|
||||
{ args: { label: Transport.current.current_request_id }, body: body_as_json }.to_json
|
||||
end
|
||||
|
@ -62,8 +63,6 @@ class ApplicationRecord < ActiveRecord::Base
|
|||
end
|
||||
|
||||
def broadcast!
|
||||
# no = [User, Device, EdgeNode, PrimaryNode, TokenIssuance, ]
|
||||
# `espeak "#{self.class.name}"` if !no.include?(self.class)
|
||||
AutoSyncJob.perform_later(broadcast_payload,
|
||||
current_device.id,
|
||||
chan_name,
|
||||
|
|
|
@ -6,10 +6,10 @@ class Device < ApplicationRecord
|
|||
|
||||
TIMEZONES = TZInfo::Timezone.all_identifiers
|
||||
BAD_TZ = "%{value} is not a valid timezone"
|
||||
THROTTLE_ON = "Device is sending too many logs. " \
|
||||
"Suspending log storage until %s."
|
||||
THROTTLE_ON = "Device is sending too many logs (%s). " \
|
||||
"Suspending log storage and display until %s."
|
||||
THROTTLE_OFF = "Cooldown period has ended. "\
|
||||
"Resuming log transmission."
|
||||
"Resuming log storage."
|
||||
CACHE_KEY = "devices.%s"
|
||||
|
||||
has_many :device_configs, dependent: :destroy
|
||||
|
@ -74,22 +74,6 @@ class Device < ApplicationRecord
|
|||
Time.now.in_time_zone(self.timezone || "UTC").utc_offset / 1.hour
|
||||
end
|
||||
|
||||
# Send a realtime message to a logged in user.
|
||||
def tell(message, channels = [], type = "info")
|
||||
log = Log.new({ device: self,
|
||||
message: message,
|
||||
created_at: Time.now,
|
||||
channels: channels,
|
||||
major_version: 99,
|
||||
minor_version: 99,
|
||||
meta: {},
|
||||
type: type })
|
||||
json = LogSerializer.new(log).as_json.to_json
|
||||
|
||||
Transport.current.amqp_send(json, self.id, "logs")
|
||||
log
|
||||
end
|
||||
|
||||
def plants
|
||||
points.where(pointer_type: "Plant")
|
||||
end
|
||||
|
@ -114,26 +98,59 @@ class Device < ApplicationRecord
|
|||
|
||||
# Sets the `throttled_at` field, but only if it is unpopulated.
|
||||
# Performs no-op if `throttled_at` was already set.
|
||||
def maybe_throttle_until(until_time)
|
||||
if throttled_until.nil?
|
||||
update_attributes!(throttled_until: until_time, throttled_at: Time.now)
|
||||
def maybe_throttle(violation)
|
||||
# Some log validation errors will result in until_time being `nil`.
|
||||
if (violation && throttled_until.nil?)
|
||||
et = violation.ends_at
|
||||
reload.update_attributes!(throttled_until: et,
|
||||
throttled_at: Time.now)
|
||||
refresh_cache
|
||||
cooldown = until_time.in_time_zone(self.timezone || "UTC").strftime("%I:%M%p")
|
||||
cooldown_notice(THROTTLE_ON % [cooldown], until_time)
|
||||
cooldown = et.in_time_zone(self.timezone || "UTC").strftime("%I:%M%p")
|
||||
info = [violation.explanation, cooldown]
|
||||
cooldown_notice(THROTTLE_ON % info, et, "warn")
|
||||
end
|
||||
end
|
||||
|
||||
def maybe_unthrottle
|
||||
if throttled_until.present?
|
||||
old_time = throttled_until
|
||||
update_attributes!(throttled_until: nil, throttled_at: Time.now)
|
||||
reload # <= WHY!?! TODO: Find out why it crashes without this.
|
||||
.update_attributes!(throttled_until: nil, throttled_at: nil)
|
||||
refresh_cache
|
||||
cooldown_notice(THROTTLE_OFF, old_time)
|
||||
cooldown_notice(THROTTLE_OFF, old_time, "info")
|
||||
end
|
||||
end
|
||||
# Send a realtime message to a logged in user.
|
||||
def tell(message, channels = [], type = "info")
|
||||
log = Log.new({ device: self,
|
||||
message: message,
|
||||
created_at: Time.now,
|
||||
channels: channels,
|
||||
major_version: 99,
|
||||
minor_version: 99,
|
||||
meta: {},
|
||||
type: type })
|
||||
json = LogSerializer.new(log).as_json.to_json
|
||||
|
||||
def cooldown_notice(message, throttle_time, now = Time.current)
|
||||
hours = ((throttle_time - now) / 1.hour).round
|
||||
tell(message, [(hours > 2) ? "email" : "toast"], "alert")
|
||||
Transport.current.amqp_send(json, self.id, "logs")
|
||||
return log
|
||||
end
|
||||
|
||||
def cooldown_notice(message, throttle_time, type, now = Time.current)
|
||||
hours = ((throttle_time - now) / 1.hour).round
|
||||
channels = [(hours > 2) ? "email" : "toast"]
|
||||
tell(message, channels , type).save
|
||||
end
|
||||
|
||||
# CONTEXT:
|
||||
# * We tried to use Rails low level caching, but it hit marshalling issues.
|
||||
# * We did a hack with Device.new(self.as_json) to get around it.
|
||||
# * Mutations does not allow unsaved models
|
||||
# * We converted the `model :device, class: Device` to:
|
||||
# `duck :device, methods [:id, :is_device]`
|
||||
#
|
||||
# This methd is not required, but adds a layer of safety.
|
||||
def is_device # SEE: Hack in Log::Create. TODO: Fix low level caching bug.
|
||||
true
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,7 +4,11 @@ module Logs
|
|||
BAD_WORDS = "Log message contained blacklisted words"
|
||||
|
||||
required do
|
||||
model :device, class: Device
|
||||
# TODO: Some strange stuff happened with caching in the log service.
|
||||
# Had to change this from "model" to "duck" as a result.
|
||||
# See Device#refresh_cache(). Rails thinks cached `Device` objects
|
||||
# are unsaved.
|
||||
duck :device, methods: [:id, :is_device]
|
||||
string :message
|
||||
end
|
||||
|
||||
|
@ -52,8 +56,9 @@ module Logs
|
|||
|
||||
def validate
|
||||
add_error :log, :private, BAD_WORDS if has_bad_words
|
||||
add_error :device, :no_id, "ID of device can't be nil" unless device.id
|
||||
@log = Log.new
|
||||
@log.device = device
|
||||
@log.device_id = device.id
|
||||
@log.message = message
|
||||
@log.channels = channels || []
|
||||
@log.x = transitional_field(:x)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
class DeviceSerializer < ActiveModel::Serializer
|
||||
attributes :id, :name, :timezone, :last_saw_api, :last_saw_mq, :tz_offset_hrs,
|
||||
:fbos_version
|
||||
:fbos_version, :throttled_until, :throttled_at
|
||||
end
|
||||
|
|
36
fake_logs.rb
36
fake_logs.rb
|
@ -1,36 +0,0 @@
|
|||
$log = {
|
||||
z: 0,
|
||||
y: 0,
|
||||
x: 0,
|
||||
verbosity: 1,
|
||||
type: "info",
|
||||
major_version: 6,
|
||||
minor_version: 4,
|
||||
patch_version: 1,
|
||||
message: "hey!!!",
|
||||
created_at: Time.now.to_i,
|
||||
channels: ["email"]
|
||||
}
|
||||
|
||||
$count = 0
|
||||
$device_id = Device.last.id
|
||||
|
||||
LogDispatch.destroy_all
|
||||
Log.destroy_all
|
||||
|
||||
ATTEMPT_LOG = { WORKS: 1, TRIGGERS_ERROR: 0.1 }
|
||||
|
||||
def ping(interval = 0)
|
||||
sleep interval
|
||||
$count += 1
|
||||
puts "Log ##{$count}"
|
||||
$log[:message] = "Hey! #{$count}"
|
||||
Transport.current.amqp_send($log.to_json, $device_id, "logs")
|
||||
end
|
||||
|
||||
loop do
|
||||
puts "Sending..."
|
||||
5.times { ping(0.1) }
|
||||
puts "Enter to send again, y to exit."
|
||||
exit if gets.chomp.downcase == "y"
|
||||
end
|
|
@ -2,9 +2,10 @@
|
|||
# Listens to *ALL* incoming logs and stores them to the DB.
|
||||
# Also handles throttling.
|
||||
class LogService
|
||||
THROTTLE_POLICY = ThrottlePolicy.new Throttler.new(1.minute) => 0.5 * 1_000,
|
||||
Throttler.new(1.hour) => 0.5 * 10_000,
|
||||
Throttler.new(1.day) => 0.5 * 100_000
|
||||
T = ThrottlePolicy::TimePeriod
|
||||
THROTTLE_POLICY = ThrottlePolicy.new T.new(1.minute) => 0.5 * 1_000,
|
||||
T.new(1.hour) => 0.5 * 10_000,
|
||||
T.new(1.day) => 0.5 * 100_000
|
||||
|
||||
def self.process(delivery_info, payload)
|
||||
params = { routing_key: delivery_info.routing_key, payload: payload }
|
||||
|
@ -15,11 +16,11 @@ class LogService
|
|||
end
|
||||
|
||||
def self.maybe_deliver(data)
|
||||
throttled_until = THROTTLE_POLICY.is_throttled(data.device_id)
|
||||
ok = data.valid? && !throttled_until
|
||||
violation = THROTTLE_POLICY.is_throttled(data.device_id)
|
||||
ok = data.valid? && !violation
|
||||
|
||||
data.device.auto_sync_transaction do
|
||||
ok ? deliver(data) : warn_user(data, throttled_until)
|
||||
ok ? deliver(data) : warn_user(data, violation)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -29,7 +30,7 @@ class LogService
|
|||
LogDispatch.deliver(dev, Logs::Create.run!(log, device: dev))
|
||||
end
|
||||
|
||||
def self.warn_user(data, throttled_until)
|
||||
data.device.maybe_throttle_until(throttled_until)
|
||||
def self.warn_user(data, violation)
|
||||
data.device.maybe_throttle(violation)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -45,7 +45,7 @@ describe LogService do
|
|||
data = AmqpLogParser::DeliveryInfo.new
|
||||
data.device_id = FactoryBot.create(:device).id
|
||||
time = Time.now
|
||||
expect_any_instance_of(Device).to receive(:maybe_throttle_until).with(time)
|
||||
expect_any_instance_of(Device).to receive(:maybe_throttle).with(time)
|
||||
LogService.warn_user(data, time)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,33 +1,34 @@
|
|||
require "spec_helper"
|
||||
NOW = Time.new("2018-05-18T09:38:02.259-05:00")
|
||||
|
||||
describe Throttler do
|
||||
klass = ThrottlePolicy::TimePeriod
|
||||
describe klass do
|
||||
let(:policy) do
|
||||
ThrottlePolicy.new Throttler.new(1.minute, NOW) => 1,
|
||||
Throttler.new(1.hour, NOW) => 10,
|
||||
Throttler.new(1.day, NOW) => 100
|
||||
ThrottlePolicy.new klass.new(1.minute, NOW) => 1,
|
||||
klass.new(1.hour, NOW) => 10,
|
||||
klass.new(1.day, NOW) => 100
|
||||
end
|
||||
|
||||
it "initializes" do
|
||||
expect(policy.rules).to be
|
||||
expect(policy.rules.map(&:limit).sort).to eq([1, 10, 100])
|
||||
actual = policy.rules.map(&:throttler).map(&:time_unit_in_seconds).sort
|
||||
actual = policy.rules.map(&:time_period).map(&:time_unit).sort
|
||||
expected = [1.minute, 1.hour, 1.day]
|
||||
expect(actual).to eq(expected)
|
||||
end
|
||||
|
||||
it "tracks things" do
|
||||
count1 = policy.rules.map(&:throttler).map{|t| t.usage_count_for(123)}
|
||||
count1 = policy.rules.map(&:time_period).map{|t| t.usage_count_for(123)}
|
||||
expect(count1).to eq([0, 0, 0])
|
||||
5.times { policy.track(123, NOW + 1) }
|
||||
count2 = policy.rules.map(&:throttler).map{|t| t.usage_count_for(123)}
|
||||
count2 = policy.rules.map(&:time_period).map{|t| t.usage_count_for(123)}
|
||||
expect(count2).to eq([5, 5, 5])
|
||||
end
|
||||
|
||||
it "returns the cool down end time when the ID is throttled" do
|
||||
5.times { policy.track(123, NOW + 1) }
|
||||
result = policy.is_throttled(123)
|
||||
expect(result).to be_kind_of(Time)
|
||||
expect(result).to be_kind_of(ThrottlePolicy::Violation)
|
||||
end
|
||||
|
||||
it "ignores the block when it's over the limit" do
|
||||
|
|
|
@ -1,18 +1,18 @@
|
|||
require "spec_helper"
|
||||
|
||||
describe Throttler do
|
||||
describe ThrottlePolicy::TimePeriod do
|
||||
let(:stub_time) { Time.new(2018, 5, 18, 9, 38, 25) }
|
||||
|
||||
it "sets a time unit window size" do
|
||||
expected_time_period = stub_time.to_i / 1.minute.to_i
|
||||
one_min = Throttler.new(1.minute, stub_time)
|
||||
one_min = ThrottlePolicy::TimePeriod.new(1.minute, stub_time)
|
||||
expect(one_min.current_period).to eq(expected_time_period)
|
||||
expect(one_min.time_unit_in_seconds).to eq(60)
|
||||
expect(one_min.time_unit).to eq(60)
|
||||
expect(one_min.entries).to eq({})
|
||||
end
|
||||
|
||||
it "increments the count" do
|
||||
t = Throttler.new(1.minute, stub_time)
|
||||
t = ThrottlePolicy::TimePeriod.new(1.minute, stub_time)
|
||||
uid = 123
|
||||
|
||||
# Ignore events from the past.
|
||||
|
@ -33,7 +33,7 @@ describe Throttler do
|
|||
end
|
||||
|
||||
it "tells you when the next time period starts" do
|
||||
one_hour = Throttler.new(1.hour, stub_time)
|
||||
one_hour = ThrottlePolicy::TimePeriod.new(1.hour, stub_time)
|
||||
next_hour = one_hour.when_does_next_period_start?
|
||||
expect(next_hour).to be_kind_of(Time)
|
||||
expect(next_hour.hour).to be(stub_time.hour + 1)
|
|
@ -0,0 +1,15 @@
|
|||
describe ThrottlePolicy::Violation do
|
||||
violation = ThrottlePolicy::Violation
|
||||
rule = ThrottlePolicy::Rule
|
||||
time_period = ThrottlePolicy::TimePeriod
|
||||
|
||||
it 'is comparable' do
|
||||
smaller = violation.new(rule.new(time_period.new(1.minute), 10))
|
||||
bigger = violation.new(rule.new(time_period.new(1.day), 10))
|
||||
medium = violation.new(rule.new(time_period.new(1.hour), 10))
|
||||
violations = [medium, smaller, bigger]
|
||||
result = violations.sort
|
||||
expect(result.first).to be(smaller)
|
||||
expect(result.last).to be(bigger)
|
||||
end
|
||||
end
|
|
@ -62,20 +62,26 @@ describe Device do
|
|||
end
|
||||
|
||||
it "throttles a device that sends too many logs" do
|
||||
expect(device).to receive(:tell)
|
||||
expect(device).to receive(:tell).and_return(Log.new)
|
||||
device.update_attributes!(throttled_until: nil)
|
||||
expect(device.throttled_until).to be(nil)
|
||||
example = Time.now + 1.minute
|
||||
device.maybe_throttle_until(example)
|
||||
expect(device.throttled_until).to eq(example)
|
||||
five_minutes = ThrottlePolicy::TimePeriod.new(5.minutes, Time.now + 1.minute)
|
||||
rule = ThrottlePolicy::Rule.new(five_minutes, 500)
|
||||
violation = ThrottlePolicy::Violation.new(rule)
|
||||
device.maybe_throttle(violation)
|
||||
expect(device.throttled_until).to eq(violation.ends_at)
|
||||
end
|
||||
|
||||
it "unthrottles a runaway device" do
|
||||
expect(device).to receive(:tell)
|
||||
expect(device).to receive(:tell).and_return(Log.new)
|
||||
example = Time.now - 1.minute
|
||||
device.update_attributes!(throttled_until: example)
|
||||
expect(device.throttled_until).to eq(example)
|
||||
device.maybe_unthrottle
|
||||
expect(device.throttled_until).to eq(nil)
|
||||
end
|
||||
|
||||
it "is a device" do
|
||||
expect(Device.new.is_device).to eq(true)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
jest.mock("../connect_device", () => {
|
||||
return {
|
||||
bothUp: jest.fn(),
|
||||
batchInitResources: jest.fn(() => ({ type: "NOOP", payload: undefined }))
|
||||
};
|
||||
});
|
||||
|
||||
const mockThrottleStatus = { value: false };
|
||||
jest.mock("../device_is_throttled", () => {
|
||||
return { deviceIsThrottled: jest.fn(() => mockThrottleStatus.value) };
|
||||
});
|
||||
|
||||
import { BatchQueue } from "../batch_queue";
|
||||
import { fakeLog } from "../../__test_support__/fake_state/resources";
|
||||
import { bothUp, batchInitResources } from "../connect_device";
|
||||
|
||||
describe("BatchQueue", () => {
|
||||
it("calls bothUp() to track network connectivity", () => {
|
||||
jest.clearAllMocks();
|
||||
mockThrottleStatus.value = false;
|
||||
const q = new BatchQueue(1);
|
||||
const log = fakeLog();
|
||||
q.push(log);
|
||||
q.maybeWork();
|
||||
expect(bothUp).toHaveBeenCalled();
|
||||
expect(batchInitResources).toHaveBeenCalledWith([log]);
|
||||
});
|
||||
|
||||
it("does nothing when throttled", () => {
|
||||
jest.clearAllMocks();
|
||||
mockThrottleStatus.value = true;
|
||||
const q = new BatchQueue(1);
|
||||
q.push(fakeLog());
|
||||
q.maybeWork();
|
||||
expect(bothUp).toHaveBeenCalled();
|
||||
expect(batchInitResources).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
|
@ -222,7 +222,7 @@ describe("onLogs", () => {
|
|||
const log = fakeLog("error", []);
|
||||
log.message = "bot xyz is offline";
|
||||
fn(log);
|
||||
globalQueue.work();
|
||||
globalQueue.maybeWork();
|
||||
expect(dispatchNetworkDown).toHaveBeenCalledWith("bot.mqtt");
|
||||
});
|
||||
});
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
import { fakeDevice } from "../../__test_support__/resource_index_builder";
|
||||
import { deviceIsThrottled } from "../device_is_throttled";
|
||||
|
||||
describe("deviceIsThrottled", () => {
|
||||
it("returns true when throttled_at and throttled_until are set", () => {
|
||||
const dev = fakeDevice();
|
||||
dev.body.throttled_until = "whatever";
|
||||
dev.body.throttled_at = "whatever";
|
||||
expect(deviceIsThrottled(dev.body)).toBeTruthy();
|
||||
});
|
||||
|
||||
it("returns false for unthrottled devices", () => {
|
||||
expect(deviceIsThrottled(fakeDevice().body)).toBeFalsy();
|
||||
});
|
||||
|
||||
it("returns false for undefined", () => {
|
||||
expect(deviceIsThrottled(undefined)).toBeFalsy();
|
||||
});
|
||||
});
|
|
@ -1,27 +1,43 @@
|
|||
/** Performs operations in batches at a regular interval.
|
||||
* Useful for rendering intensive tasks such as handling massive amounts of
|
||||
* incoming logs.
|
||||
* We only need one work queue for the whole app, but singletons are bad. */
|
||||
class BatchQueue {
|
||||
private queue: Function[] = [];
|
||||
private timerId = 0;
|
||||
import { TaggedLog } from "../resources/tagged_resources";
|
||||
import { store } from "../redux/store";
|
||||
import { batchInitResources, bothUp } from "./connect_device";
|
||||
import { maybeGetDevice } from "../resources/selectors";
|
||||
import { deviceIsThrottled } from "./device_is_throttled";
|
||||
|
||||
/** Performs resource initialization (Eg: a storm of incoming logs) in batches
|
||||
* at a regular interval. We only need one work queue for the whole app,
|
||||
* but singletons are bad. */
|
||||
export class BatchQueue {
|
||||
private queue: TaggedLog[] = [];
|
||||
|
||||
/** Create a new batch queue that will check for new messages and execute them
|
||||
* at a specified work rate (ms).*/
|
||||
constructor(workRateMS = 600) {
|
||||
this.timerId = window.setInterval(this.work, workRateMS);
|
||||
constructor(workRateMS: number) {
|
||||
// We will need to store this int if we ever want to cancel queue polling:
|
||||
window.setInterval(this.maybeWork, workRateMS);
|
||||
}
|
||||
|
||||
maybeWork = () => {
|
||||
const { length } = this.queue;
|
||||
length && this.work();
|
||||
}
|
||||
|
||||
work = () => {
|
||||
this.queue.map(fn => fn());
|
||||
const dev = maybeGetDevice(store.getState().resources.index);
|
||||
if (!deviceIsThrottled(dev ? dev.body : undefined)) {
|
||||
store.dispatch(batchInitResources(this.queue));
|
||||
}
|
||||
this.clear();
|
||||
bothUp();
|
||||
}
|
||||
|
||||
push = (resource: TaggedLog) => {
|
||||
this.queue.push(resource);
|
||||
}
|
||||
|
||||
push = (job: Function) => this.queue.push(job);
|
||||
clear = () => this.queue = [];
|
||||
destroy = () => window.clearInterval(this.timerId);
|
||||
}
|
||||
|
||||
/** The only work queue needed for the whole app.
|
||||
* Mock this out in your tests. */
|
||||
export const globalQueue = new BatchQueue(250);
|
||||
export const globalQueue = new BatchQueue(1500);
|
||||
|
|
|
@ -55,8 +55,9 @@ export function showLogOnScreen(log: Log) {
|
|||
switch (log.type) {
|
||||
case "success":
|
||||
return success(log.message, TITLE);
|
||||
case "busy":
|
||||
case "warn":
|
||||
return warning(log.message, TITLE);
|
||||
case "busy":
|
||||
case "error":
|
||||
return error(log.message, TITLE);
|
||||
case "fun":
|
||||
|
@ -83,6 +84,11 @@ export const initLog = (log: Log): ReduxAction<TaggedResource> => init({
|
|||
body: log
|
||||
}, true);
|
||||
|
||||
export const batchInitResources =
|
||||
(payload: TaggedResource[]): ReduxAction<TaggedResource[]> => {
|
||||
return { type: Actions.BATCH_INIT, payload };
|
||||
};
|
||||
|
||||
export const bothUp = () => {
|
||||
dispatchNetworkUp("user.mqtt");
|
||||
dispatchNetworkUp("bot.mqtt");
|
||||
|
@ -122,8 +128,10 @@ const onStatus = (dispatch: Function, getState: GetState) =>
|
|||
|
||||
type Client = { connected?: boolean };
|
||||
|
||||
export const onSent = (client: Client) => () => !!client.connected ?
|
||||
dispatchNetworkUp("user.mqtt") : dispatchNetworkDown("user.mqtt");
|
||||
export const onSent = (client: Client) => () => {
|
||||
!!client.connected ?
|
||||
dispatchNetworkUp("user.mqtt") : dispatchNetworkDown("user.mqtt");
|
||||
};
|
||||
|
||||
export function onMalformed() {
|
||||
bothUp();
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
import { DeviceAccountSettings } from "../devices/interfaces";
|
||||
|
||||
/** Determines if the device was forced to wait due to log flooding. */
|
||||
export const deviceIsThrottled =
|
||||
(dev: Partial<DeviceAccountSettings> | undefined): boolean => {
|
||||
return !!(dev && dev.throttled_at && dev.throttled_until);
|
||||
};
|
|
@ -1,6 +1,5 @@
|
|||
import { isLog } from "../devices/actions";
|
||||
import {
|
||||
bothUp,
|
||||
actOnChannelName,
|
||||
showLogOnScreen,
|
||||
speakLogAloud,
|
||||
|
@ -30,23 +29,21 @@ function legacyKeyTransformation(log: Log,
|
|||
}
|
||||
|
||||
export const onLogs =
|
||||
(dispatch: Function, getState: GetState) => (msg: Log) => {
|
||||
bothUp();
|
||||
(_dispatch: Function, getState: GetState) => (msg: Log) => {
|
||||
if (isLog(msg)) {
|
||||
LEGACY_META_KEY_NAMES.map(key => legacyKeyTransformation(msg, key));
|
||||
actOnChannelName(msg, "toast", showLogOnScreen);
|
||||
actOnChannelName(msg, "espeak", speakLogAloud(getState));
|
||||
globalQueue.push(() => {
|
||||
dispatch(initLog(msg));
|
||||
// CORRECT SOLUTION: Give each device its own topic for publishing
|
||||
// MQTT last will message.
|
||||
// FAST SOLUTION: We would need to re-publish FBJS and FBOS to
|
||||
// change topic structure. Instead, we will use
|
||||
// inband signalling (for now).
|
||||
// TODO: Make a `bot/device_123/offline` channel.
|
||||
const died =
|
||||
msg.message.includes("is offline") && msg.type === "error";
|
||||
died && dispatchNetworkDown("bot.mqtt");
|
||||
});
|
||||
const log = initLog(msg).payload;
|
||||
log.kind == "Log" && globalQueue.push(log);
|
||||
// CORRECT SOLUTION: Give each device its own topic for publishing
|
||||
// MQTT last will message.
|
||||
// FAST SOLUTION: We would need to re-publish FBJS and FBOS to
|
||||
// change topic structure. Instead, we will use
|
||||
// inband signalling (for now).
|
||||
// TODO: Make a `bot/device_123/offline` channel.
|
||||
const died =
|
||||
msg.message.includes("is offline") && msg.type === "error";
|
||||
died && dispatchNetworkDown("bot.mqtt");
|
||||
}
|
||||
};
|
||||
|
|
|
@ -525,7 +525,7 @@ export enum Actions {
|
|||
// Resources
|
||||
DESTROY_RESOURCE_OK = "DESTROY_RESOURCE_OK",
|
||||
INIT_RESOURCE = "INIT_RESOURCE",
|
||||
SAVE_OPENFARM_RESOURCE = "SAVE_OPENFARM_RESOURCE",
|
||||
BATCH_INIT = "BATCH_INIT",
|
||||
SAVE_RESOURCE_OK = "SAVE_RESOURCE_OK",
|
||||
UPDATE_RESOURCE_OK = "UPDATE_RESOURCE_OK",
|
||||
EDIT_RESOURCE = "EDIT_RESOURCE",
|
||||
|
|
|
@ -76,6 +76,8 @@ export interface DeviceAccountSettings {
|
|||
name: string;
|
||||
timezone?: string | undefined;
|
||||
tz_offset_hrs: number;
|
||||
throttled_until?: string;
|
||||
throttled_at?: string;
|
||||
fbos_version?: string | undefined;
|
||||
last_saw_api?: string | undefined;
|
||||
last_saw_mq?: string | undefined;
|
||||
|
|
|
@ -26,7 +26,6 @@ import {
|
|||
initialState as designerState
|
||||
} from "../farm_designer/reducer";
|
||||
import { ResourceReadyPayl } from "../sync/actions";
|
||||
import { OFCropResponse } from "../open_farm/icons";
|
||||
import {
|
||||
famrwareReducer as farmware,
|
||||
farmwareState
|
||||
|
@ -103,18 +102,6 @@ const afterEach = (state: RestResources, a: ReduxAction<object>) => {
|
|||
/** Responsible for all RESTful resources. */
|
||||
export let resourceReducer = generateReducer
|
||||
<RestResources>(initialState, afterEach)
|
||||
.add<ResourceReadyPayl>(Actions.SAVE_OPENFARM_RESOURCE, (s, { payload }) => {
|
||||
const data = arrayWrap(payload);
|
||||
const kind = payload.name;
|
||||
data.map((body: ResourceReadyPayl) => {
|
||||
const crop = body.data as OFCropResponse;
|
||||
if (crop.data) {
|
||||
const cropInfo = crop.data.attributes;
|
||||
addToIndex(s.index, kind, cropInfo, generateUuid(undefined, kind));
|
||||
}
|
||||
});
|
||||
return s;
|
||||
})
|
||||
.add<TaggedResource>(Actions.SAVE_RESOURCE_OK, (s, { payload }) => {
|
||||
const resource = payload;
|
||||
resource.specialStatus = SpecialStatus.SAVED;
|
||||
|
@ -230,14 +217,7 @@ export let resourceReducer = generateReducer
|
|||
dontTouchThis(original);
|
||||
return s;
|
||||
})
|
||||
.add<TaggedResource>(Actions.INIT_RESOURCE, (s: RestResources, { payload }) => {
|
||||
const tr = payload;
|
||||
reindexResource(s.index, tr);
|
||||
s.index.references[tr.uuid] = tr;
|
||||
sanityCheck(tr);
|
||||
dontTouchThis(tr);
|
||||
return s;
|
||||
})
|
||||
.add<TaggedResource>(Actions.INIT_RESOURCE, initResourceReducer)
|
||||
.add<TaggedResource>(Actions.SAVE_RESOURCE_START, (s, { payload }) => {
|
||||
const resource = findByUuid(s.index, payload.uuid);
|
||||
resource.specialStatus = SpecialStatus.SAVING;
|
||||
|
@ -277,6 +257,12 @@ export let resourceReducer = generateReducer
|
|||
.add<GeneralizedError>(Actions.REFRESH_RESOURCE_NO, (s, a) => {
|
||||
mutateSpecialStatus(a.payload.uuid, s.index, undefined);
|
||||
return s;
|
||||
})
|
||||
.add<TaggedResource[]>(Actions.BATCH_INIT, (s, { payload }) => {
|
||||
return payload.reduce((state, resource) => {
|
||||
const action = { type: Actions.INIT_RESOURCE, payload: resource };
|
||||
return initResourceReducer(state, action);
|
||||
}, s);
|
||||
});
|
||||
|
||||
/** Helper method to change the `specialStatus` of a resource in the index */
|
||||
|
@ -367,3 +353,13 @@ function doRecalculateLocalSequenceVariables(next: TaggedSequence) {
|
|||
next.body.args = recomputed.args;
|
||||
next.body.body = recomputed.body;
|
||||
}
|
||||
|
||||
function initResourceReducer(s: RestResources,
|
||||
{ payload }: ReduxAction<TaggedResource>): RestResources {
|
||||
const tr = payload;
|
||||
reindexResource(s.index, tr);
|
||||
s.index.references[tr.uuid] = tr;
|
||||
sanityCheck(tr);
|
||||
dontTouchThis(tr);
|
||||
return s;
|
||||
}
|
||||
|
|
|
@ -189,18 +189,15 @@ export function maybeGetDevice(index: ResourceIndex): TaggedDevice | undefined {
|
|||
dev : undefined;
|
||||
}
|
||||
|
||||
export function getDeviceAccountSettings(index: ResourceIndex): TaggedDevice {
|
||||
const list = index.byKind.Device;
|
||||
const uuid = list[0] || "_";
|
||||
const device = index.references[uuid];
|
||||
switch (list.length) {
|
||||
case 0: return bail(`Tried to load device before it was loaded.`);
|
||||
case 1: return (device && device.kind === "Device" && sanityCheck(device))
|
||||
? device
|
||||
: bail("Malformed device!");
|
||||
default: return bail("Found more than 1 device");
|
||||
}
|
||||
}
|
||||
export const getDeviceAccountSettings =
|
||||
(index: ResourceIndex): TaggedDevice => {
|
||||
const device = maybeGetDevice(index);
|
||||
switch (index.byKind.Device.length) {
|
||||
case 0: return bail(`Tried to load device before it was loaded.`);
|
||||
case 1: return (device) ? device : bail("Malformed device!");
|
||||
default: return bail("Found more than 1 device");
|
||||
}
|
||||
};
|
||||
|
||||
export function maybeFetchUser(index: ResourceIndex):
|
||||
TaggedUser | undefined {
|
||||
|
|
Loading…
Reference in New Issue