ThrottlePolicy re-write
parent
8af5fa5ac3
commit
98251fbb02
|
@ -2,11 +2,9 @@
|
|||
# Listens to *ALL* incoming logs and stores them to the DB.
|
||||
# Also handles throttling.
|
||||
class LogService < AbstractServiceRunner
|
||||
THROTTLE_POLICY = ThrottlePolicy.new(name, {
|
||||
1.minute => 0.5 * 1_000,
|
||||
1.hour => 0.5 * 10_000,
|
||||
1.day => 0.5 * 100_000,
|
||||
})
|
||||
THROTTLE_POLICY = ThrottlePolicy.new(name, min: 250,
|
||||
hour: 5_000,
|
||||
day: 50_000)
|
||||
|
||||
LOG_TPL = Rails.env.test? ?
|
||||
"\e[32m.\e[0m" : "FBOS LOG (device_%s): %s\n"
|
||||
|
|
|
@ -4,11 +4,9 @@
|
|||
class TelemetryService < AbstractServiceRunner
|
||||
MESSAGE = "TELEMETRY MESSAGE FROM %s"
|
||||
FAILURE = "FAILED TELEMETRY MESSAGE FROM %s"
|
||||
THROTTLE_POLICY = ThrottlePolicy.new(name, {
|
||||
1.minute => 25,
|
||||
1.hour => 250,
|
||||
1.day => 1500,
|
||||
})
|
||||
THROTTLE_POLICY = ThrottlePolicy.new(name, min: 25,
|
||||
hour: 250,
|
||||
day: 1500)
|
||||
|
||||
def process(delivery_info, payload)
|
||||
device_key = delivery_info
|
||||
|
|
|
@ -1,22 +1,60 @@
|
|||
# Handles devices that spin out of control and send too many logs to the server.
|
||||
class ThrottlePolicy
|
||||
attr_reader :rules
|
||||
|
||||
TIME_UNITS = [:minute,
|
||||
:hour,
|
||||
:day]
|
||||
TTL = { min: 60,
|
||||
hour: 60 * 60,
|
||||
day: 60 * 60 * 24 }
|
||||
TIME_UNITS = TTL.keys
|
||||
attr_reader :rule_map, :namespace
|
||||
|
||||
def initialize(namespace, rule_map)
|
||||
# rule_map.map do |(unit, _)|
|
||||
# raise "BAD TIME UNIT" unless TIME_UNITS.include?(unit)
|
||||
# end
|
||||
@namespace = namespace
|
||||
@rule_map = rule_map
|
||||
each_rule { |unit| validate_unit!(unit) }
|
||||
end
|
||||
|
||||
def track(unique_id)
|
||||
def track(id)
|
||||
each_rule { |unit| incr(id, unit) }
|
||||
end
|
||||
|
||||
def is_throttled(unique_id)
|
||||
def is_throttled(id)
|
||||
violations(id).count > 0
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def validate_unit!(unit)
|
||||
raise "BAD TIME UNIT" unless TIME_UNITS.include?(unit)
|
||||
end
|
||||
|
||||
def each_rule
|
||||
rule_map.map { |(k, v)| yield(k, v) }
|
||||
end
|
||||
|
||||
def redis
|
||||
Rails.cache.redis
|
||||
end
|
||||
|
||||
def the_time_part(period, now = Time.now)
|
||||
[period, now.send(period)].map(&:to_s).join()
|
||||
end
|
||||
|
||||
def cache_key(id, period)
|
||||
[namespace, the_time_part(period), id].join(":")
|
||||
end
|
||||
|
||||
def get(id, period)
|
||||
(redis.get(cache_key(id, period)) || "0").to_i
|
||||
end
|
||||
|
||||
def incr(id, period)
|
||||
key = cache_key(id, period)
|
||||
result = redis.incr(key)
|
||||
needs_ttl = redis.ttl(key) < 1
|
||||
redis.expire(key, TTL.fetch(period)) if needs_ttl
|
||||
result
|
||||
end
|
||||
|
||||
def violations(id)
|
||||
each_rule { |k, v| (get(id, k) > v) ? k : nil }.compact
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
class ThrottlePolicy
|
||||
# A time_period object paired with a max limit
|
||||
class Rule
|
||||
attr_reader :time_period, :limit
|
||||
|
||||
def initialize(namespace, time_period, limit, now = Time.now)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,18 +0,0 @@
|
|||
# Track the number of occurrences of an event over time.
|
||||
#
|
||||
# Given:
|
||||
# * A fixed size duration (1 minute, 1 week etc)
|
||||
# * An event (In our case, log creation)
|
||||
# * An initiator id (eg: device_id)
|
||||
#
|
||||
# Produces:
|
||||
# * A table of event counts for the current time period, indexed by
|
||||
# the initiator ID.
|
||||
class ThrottlePolicy
|
||||
class TimePeriod
|
||||
attr_reader :time_unit, :current_period
|
||||
|
||||
def initialize(namespace, duration, now = Time.now)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,8 +0,0 @@
|
|||
class ThrottlePolicy
|
||||
class Violation
|
||||
attr_reader :rule
|
||||
|
||||
def initialize(rule)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -62,20 +62,6 @@ describe LogService do
|
|||
LogService.new.warn_user(data, time)
|
||||
end
|
||||
|
||||
it "triggers a throttle" do
|
||||
tp = LogService::THROTTLE_POLICY
|
||||
ls = LogService.new
|
||||
data = AmqpLogParser::DeliveryInfo.new
|
||||
data.device_id = FactoryBot.create(:device).id
|
||||
violation = ThrottlePolicy::Violation.new(Object.new)
|
||||
allow(ls).to receive(:deliver)
|
||||
expect(ls).to receive(:warn_user)
|
||||
expect(tp).to receive(:is_throttled)
|
||||
.with(data.device_id)
|
||||
.and_return(violation)
|
||||
ls.maybe_deliver(data)
|
||||
end
|
||||
|
||||
it "handles bad params" do
|
||||
expect do
|
||||
LogService.new.process(fake_delivery_info, {})
|
||||
|
|
Loading…
Reference in New Issue