Merge branch 'staging' into cosmetic-fixes
commit
c385dcc875
|
@ -11,16 +11,18 @@ class AmqpLogParser < Mutations::Command
|
|||
# I keep a Ruby copy of the JSON here for reference.
|
||||
# This is what a log will look like after JSON.parse()
|
||||
EXAMPLE_JSON = {
|
||||
"meta" => {
|
||||
"x" => 0,
|
||||
"y" => 0,
|
||||
"z" => 0,
|
||||
"type" => "info",
|
||||
},
|
||||
"major_version" => 6, # <= up-to-date bots do this
|
||||
"message" => "HQ FarmBot TEST 123 Pin 13 is 0",
|
||||
"created_at" => 1512585641,
|
||||
"channels" => [],
|
||||
"created_at" => 1572015955,
|
||||
"major_version" => 8,
|
||||
"message" => "Syncing",
|
||||
"meta" => {},
|
||||
"minor_version" => 1,
|
||||
"patch_version" => 1,
|
||||
"type" => "info",
|
||||
"verbosity" => 3,
|
||||
"x" => 0.0,
|
||||
"y" => 0.0,
|
||||
"z" => 0.0,
|
||||
}
|
||||
|
||||
required do
|
||||
|
@ -102,8 +104,19 @@ class AmqpLogParser < Mutations::Command
|
|||
end
|
||||
|
||||
def find_problems!
|
||||
@output.problems.push(NOT_HASH) and return if not_hash?
|
||||
@output.problems.push(TOO_OLD) and return if major_version < 6
|
||||
@output.problems.push(DISCARD) and return if discard?
|
||||
if not_hash?
|
||||
@output.problems.push(NOT_HASH)
|
||||
return
|
||||
end
|
||||
|
||||
if (major_version || 0) < 7
|
||||
@output.problems.push(TOO_OLD)
|
||||
return
|
||||
end
|
||||
|
||||
if discard?
|
||||
@output.problems.push(DISCARD)
|
||||
return
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,10 +2,11 @@
|
|||
# Listens to *ALL* incoming logs and stores them to the DB.
|
||||
# Also handles throttling.
|
||||
class LogService < AbstractServiceRunner
|
||||
T = ThrottlePolicy::TimePeriod
|
||||
THROTTLE_POLICY = ThrottlePolicy.new T.new(1.minute) => 0.5 * 1_000,
|
||||
T.new(1.hour) => 0.5 * 10_000,
|
||||
T.new(1.day) => 0.5 * 100_000
|
||||
THROTTLE_POLICY = ThrottlePolicy.new(name, {
|
||||
1.minute => 0.5 * 1_000,
|
||||
1.hour => 0.5 * 10_000,
|
||||
1.day => 0.5 * 100_000,
|
||||
})
|
||||
|
||||
LOG_TPL = Rails.env.test? ?
|
||||
"\e[32m.\e[0m" : "FBOS LOG (device_%s): %s\n"
|
||||
|
|
|
@ -4,10 +4,10 @@
|
|||
class TelemetryService < AbstractServiceRunner
|
||||
MESSAGE = "TELEMETRY MESSAGE FROM %s"
|
||||
FAILURE = "FAILED TELEMETRY MESSAGE FROM %s"
|
||||
THROTTLE_POLICY = ThrottlePolicy.new({
|
||||
ThrottlePolicy::TimePeriod.new(1.minute) => 25,
|
||||
ThrottlePolicy::TimePeriod.new(1.hour) => 250,
|
||||
ThrottlePolicy::TimePeriod.new(1.day) => 1500,
|
||||
THROTTLE_POLICY = ThrottlePolicy.new(name, {
|
||||
1.minute => 25,
|
||||
1.hour => 250,
|
||||
1.day => 1500,
|
||||
})
|
||||
|
||||
def process(delivery_info, payload)
|
||||
|
|
|
@ -1,27 +1,30 @@
|
|||
# Handles devices that spin out of control and send too many logs to the server.
|
||||
# Class Hierarchy:
|
||||
# ThrottlePolicy has => Rules creates => Violation
|
||||
# Violation has => Rule has => TimePeriod
|
||||
# ThrottlePolicy
|
||||
# \
|
||||
# +----> Rule --> TimePeriod
|
||||
# |\
|
||||
# | `--> Rule --> TimePeriod
|
||||
# \_
|
||||
# `-> Rule --> TimePeriod
|
||||
class ThrottlePolicy
|
||||
attr_reader :rules
|
||||
|
||||
# Dictionary<TimePeriod, Integer>
|
||||
def initialize(policy_rules)
|
||||
@rules = policy_rules.map { |rule_set| Rule.new(*rule_set) }
|
||||
def initialize(namespace, rule_map, now = Time.now)
|
||||
@rules = rule_map
|
||||
.map { |(period, limit)| Rule.new(namespace, period, limit, now) }
|
||||
end
|
||||
|
||||
def track(unique_id, now = Time.now)
|
||||
rules.each { |r| r.time_period.record_event(unique_id, now) }
|
||||
rules.each { |r| r.record_event(unique_id, now) }
|
||||
end
|
||||
|
||||
# If throttled, returns the timeperiod when device will be unthrottled
|
||||
# returns nil if not throttled
|
||||
def is_throttled(unique_id)
|
||||
rules
|
||||
.map do |rule|
|
||||
is_violation = rule.time_period.usage_count_for(unique_id) > rule.limit
|
||||
is_violation ? Violation.new(rule) : nil
|
||||
end
|
||||
.map { |rule| rule.violation?(unique_id) }
|
||||
.compact
|
||||
.max
|
||||
end
|
||||
|
|
|
@ -3,8 +3,21 @@ class ThrottlePolicy
|
|||
class Rule
|
||||
attr_reader :time_period, :limit
|
||||
|
||||
def initialize(time_period, limit)
|
||||
@time_period, @limit = time_period, limit
|
||||
def initialize(namespace, time_period, limit, now = Time.now)
|
||||
@time_period = ThrottlePolicy::TimePeriod.new(namespace, time_period, now)
|
||||
@limit = limit
|
||||
end
|
||||
|
||||
# returns the timeperiod when device will be
|
||||
# unthrottled. returns `nil` if not throttled
|
||||
def violation?(unique_id)
|
||||
if (time_period.usage_count_for(unique_id) > limit)
|
||||
Violation.new(self)
|
||||
end
|
||||
end
|
||||
|
||||
def record_event(unique_id, now)
|
||||
time_period.record_event(unique_id, now)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -10,48 +10,66 @@
|
|||
# the initiator ID.
|
||||
class ThrottlePolicy
|
||||
class TimePeriod
|
||||
attr_reader :time_unit,
|
||||
:current_period, # Slice time into fixed size windows
|
||||
:entries
|
||||
attr_reader :time_unit, :current_period
|
||||
|
||||
def initialize(active_support_duration, now = Time.now)
|
||||
@time_unit = active_support_duration
|
||||
reset_everything now
|
||||
def initialize(namespace, duration, now = Time.now)
|
||||
@time_unit = duration
|
||||
@namespace = namespace
|
||||
reset_everything(now)
|
||||
end
|
||||
|
||||
|
||||
def record_event(unique_id, now = Time.now)
|
||||
period = calculate_period(now)
|
||||
case period <=> current_period
|
||||
when -1 then return # Out of date- don't record.
|
||||
when 0 then increment_count_for(unique_id) # Right on schedule.
|
||||
when 1 then reset_everything(now) # Clear out old data.
|
||||
when -1 then return # Out of date- don't record.
|
||||
when 0 then increment_count_for(unique_id) # Right on schedule.
|
||||
when 1 then reset_everything(now) # Clear out old data.
|
||||
end
|
||||
end
|
||||
|
||||
def usage_count_for(unique_id)
|
||||
@entries[unique_id] || 0
|
||||
fetch(unique_id)
|
||||
end
|
||||
|
||||
def when_does_next_period_start?
|
||||
Time.at(current_period * time_unit.to_i) + time_unit
|
||||
end
|
||||
|
||||
private
|
||||
private
|
||||
|
||||
def reset_everything(now)
|
||||
@current_period = calculate_period(now)
|
||||
@entries = {}
|
||||
reset_cache
|
||||
end
|
||||
|
||||
def increment_count_for(unique_id)
|
||||
@entries[unique_id] ||= 0
|
||||
@entries[unique_id] += 1
|
||||
incr(unique_id)
|
||||
end
|
||||
|
||||
# Returns integer representation of current clock period
|
||||
def calculate_period(time)
|
||||
(time.to_i / @time_unit)
|
||||
end
|
||||
|
||||
def redis
|
||||
Rails.cache.redis
|
||||
end
|
||||
|
||||
def cache_key(unique_id)
|
||||
[@namespace, current_period.to_i, unique_id].join(":")
|
||||
end
|
||||
|
||||
def incr(unique_id)
|
||||
redis.incr(cache_key(unique_id))
|
||||
end
|
||||
|
||||
def fetch(unique_id)
|
||||
(redis.get(cache_key(unique_id)) || "0").to_i
|
||||
end
|
||||
|
||||
def reset_cache
|
||||
keys = redis.keys([@namespace, "*"].join(":"))
|
||||
redis.del(*keys) unless keys.empty?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -13,20 +13,24 @@ const mockResponse: { promise: Promise<{}> } = {
|
|||
};
|
||||
|
||||
jest.mock("axios", () => ({
|
||||
get: () => mockResponse.promise
|
||||
get: jest.fn(() => mockResponse.promise)
|
||||
}));
|
||||
|
||||
jest.unmock("../cached_crop");
|
||||
import { cachedCrop } from "../cached_crop";
|
||||
import axios from "axios";
|
||||
import { times } from "lodash";
|
||||
|
||||
describe("cachedIcon()", () => {
|
||||
it("does an HTTP request if the icon can't be found locally", async () => {
|
||||
times(10, () => cachedCrop("lettuce"));
|
||||
const item1 = await cachedCrop("lettuce");
|
||||
expect(item1.svg_icon).toContain("<svg>Wow</svg>");
|
||||
const item2 = await cachedCrop("lettuce");
|
||||
expect(item2.slug).toBe(item1.slug);
|
||||
expect(item2.svg_icon).toBe(item1.svg_icon);
|
||||
expect(item2.spread).toBe(undefined);
|
||||
expect(axios.get).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("handles unexpected responses from OpenFarm", async () => {
|
||||
|
|
|
@ -37,7 +37,8 @@ function localStorageIconSet(icon: OFIcon): void {
|
|||
* and the garlic icon is not cached locally, and you try to render 10 garlic
|
||||
* icons in the first 100ms, and HTTP requests take more than 100ms, you will
|
||||
* end up performing 10 HTTP requests at application start time. Not very
|
||||
* efficient */
|
||||
* efficient.
|
||||
* SOLUTION: Keep a record of open requests to avoid duplicate requests. */
|
||||
const promiseCache: Dictionary<Promise<Readonly<OFCropAttrs>>> = {};
|
||||
|
||||
const cacheTheIcon = (slug: string) =>
|
||||
|
@ -60,6 +61,8 @@ const cacheTheIcon = (slug: string) =>
|
|||
|
||||
function HTTPIconFetch(slug: string) {
|
||||
const url = OpenFarmAPI.OFBaseURL + slug;
|
||||
// Avoid duplicate requests.
|
||||
if (promiseCache[url]) { return promiseCache[url]; }
|
||||
promiseCache[url] = axios
|
||||
.get<OFCropResponse>(url)
|
||||
.then(cacheTheIcon(slug), cacheTheIcon(slug));
|
||||
|
|
|
@ -3,25 +3,25 @@ NOW = Time.new("2018-05-18T09:38:02.259-05:00")
|
|||
|
||||
klass = ThrottlePolicy::TimePeriod
|
||||
describe klass do
|
||||
let(:policy) do
|
||||
ThrottlePolicy.new klass.new(1.minute, NOW) => 1,
|
||||
klass.new(1.hour, NOW) => 10,
|
||||
klass.new(1.day, NOW) => 100
|
||||
let(:policy) do
|
||||
ThrottlePolicy.new("rspec", { 1.minute => 1,
|
||||
1.hour => 10,
|
||||
1.day => 100 }, NOW)
|
||||
end
|
||||
|
||||
it "initializes" do
|
||||
expect(policy.rules).to be
|
||||
expect(policy.rules.map(&:limit).sort).to eq([1, 10, 100])
|
||||
actual = policy.rules.map(&:time_period).map(&:time_unit).sort
|
||||
actual = policy.rules.map(&:time_period).map(&:time_unit).sort
|
||||
expected = [1.minute, 1.hour, 1.day]
|
||||
expect(actual).to eq(expected)
|
||||
end
|
||||
|
||||
it "tracks things" do
|
||||
count1 = policy.rules.map(&:time_period).map{|t| t.usage_count_for(123)}
|
||||
count1 = policy.rules.map(&:time_period).map { |t| t.usage_count_for(123) }
|
||||
expect(count1).to eq([0, 0, 0])
|
||||
5.times { policy.track(123, NOW + 1) }
|
||||
count2 = policy.rules.map(&:time_period).map{|t| t.usage_count_for(123)}
|
||||
count2 = policy.rules.map(&:time_period).map { |t| t.usage_count_for(123) }
|
||||
expect(count2).to eq([5, 5, 5])
|
||||
end
|
||||
|
||||
|
@ -34,5 +34,4 @@ describe klass do
|
|||
it "ignores the block when it's over the limit" do
|
||||
expect(policy.is_throttled 123).to be nil
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -5,14 +5,13 @@ describe ThrottlePolicy::TimePeriod do
|
|||
|
||||
it "sets a time unit window size" do
|
||||
expected_time_period = stub_time.to_i / 1.minute.to_i
|
||||
one_min = ThrottlePolicy::TimePeriod.new(1.minute, stub_time)
|
||||
one_min = ThrottlePolicy::TimePeriod.new("RSPEC0", 1.minute, stub_time)
|
||||
expect(one_min.current_period).to eq(expected_time_period)
|
||||
expect(one_min.time_unit).to eq(60)
|
||||
expect(one_min.entries).to eq({})
|
||||
end
|
||||
|
||||
it "increments the count" do
|
||||
t = ThrottlePolicy::TimePeriod.new(1.minute, stub_time)
|
||||
t = ThrottlePolicy::TimePeriod.new("RSPEC1", 1.minute, stub_time)
|
||||
uid = 123
|
||||
|
||||
# Ignore events from the past.
|
||||
|
@ -33,10 +32,10 @@ describe ThrottlePolicy::TimePeriod do
|
|||
end
|
||||
|
||||
it "tells you when the next time period starts" do
|
||||
one_hour = ThrottlePolicy::TimePeriod.new(1.hour, stub_time)
|
||||
one_hour = ThrottlePolicy::TimePeriod.new("RSPEC2", 1.hour, stub_time)
|
||||
next_hour = one_hour.when_does_next_period_start?
|
||||
expect(next_hour).to be_kind_of(Time)
|
||||
expect(next_hour.hour).to be(stub_time.hour + 1)
|
||||
expect(next_hour.min).to be(0)
|
||||
expect(next_hour.min).to be(0)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
describe ThrottlePolicy::Violation do
|
||||
violation = ThrottlePolicy::Violation
|
||||
rule = ThrottlePolicy::Rule
|
||||
violation = ThrottlePolicy::Violation
|
||||
rule = ThrottlePolicy::Rule
|
||||
time_period = ThrottlePolicy::TimePeriod
|
||||
|
||||
it 'is comparable' do
|
||||
smaller = violation.new(rule.new(time_period.new(1.minute), 10))
|
||||
bigger = violation.new(rule.new(time_period.new(1.day), 10))
|
||||
medium = violation.new(rule.new(time_period.new(1.hour), 10))
|
||||
violations = [medium, smaller, bigger]
|
||||
result = violations.sort
|
||||
it "is comparable" do
|
||||
smaller = violation.new(rule.new("X", 1.minute, 10, Time.now))
|
||||
bigger = violation.new(rule.new("X", 1.day, 10, Time.now))
|
||||
medium = violation.new(rule.new("X", 1.hour, 10, Time.now))
|
||||
violations = [medium, smaller, bigger]
|
||||
result = violations.sort
|
||||
expect(result.first).to be(smaller)
|
||||
expect(result.last).to be(bigger)
|
||||
expect(result.last).to be(bigger)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -70,8 +70,8 @@ describe Device do
|
|||
expect(device).to receive(:tell).and_return(Log.new)
|
||||
device.update!(throttled_until: nil)
|
||||
expect(device.throttled_until).to be(nil)
|
||||
five_minutes = ThrottlePolicy::TimePeriod.new(5.minutes, Time.now + 1.minute)
|
||||
rule = ThrottlePolicy::Rule.new(five_minutes, 500)
|
||||
now = Time.now + 1.minute
|
||||
rule = ThrottlePolicy::Rule.new("X", 5.minutes, 500, now)
|
||||
violation = ThrottlePolicy::Violation.new(rule)
|
||||
device.maybe_throttle(violation)
|
||||
expect(device.throttled_until).to eq(violation.ends_at)
|
||||
|
@ -82,8 +82,7 @@ describe Device do
|
|||
previous_throttle = Time.now - 1.minute
|
||||
device.update!(throttled_until: previous_throttle)
|
||||
expect(device.throttled_until).to eq(previous_throttle)
|
||||
five_minutes = ThrottlePolicy::TimePeriod.new(5.minutes, Time.now + 1.minute)
|
||||
rule = ThrottlePolicy::Rule.new(five_minutes, 500)
|
||||
rule = ThrottlePolicy::Rule.new("X", 5.minutes, 500, Time.now + 1.minute)
|
||||
violation = ThrottlePolicy::Violation.new(rule)
|
||||
device.maybe_throttle(violation)
|
||||
expect(device.throttled_until).to eq(violation.ends_at)
|
||||
|
|
Loading…
Reference in New Issue