UNSTABLE: Factor down service runners.
parent
2633643fb9
commit
4b8739c18e
|
@ -0,0 +1,19 @@
|
|||
class AbstractServiceRunner
|
||||
WAIT_TIME = Rails.env.test? ? 0.01 : 5
|
||||
OFFLINE_ERROR = Bunny::TCPConnectionFailedForAllHosts
|
||||
CRASH_MSG = Rails.env.test? ?
|
||||
"\e[32m.\e[0m" : "Something caused the broker to crash...\n"
|
||||
|
||||
def go!(channel)
|
||||
channel.subscribe(block: true) do |info, _, payl|
|
||||
self.process(info, payl.force_encoding("UTF-8"))
|
||||
end
|
||||
rescue OFFLINE_ERROR, StandardError => e
|
||||
unless e.is_a?(OFFLINE_ERROR)
|
||||
Rollbar.error(e)
|
||||
print CRASH_MSG
|
||||
end
|
||||
sleep WAIT_TIME
|
||||
retry
|
||||
end
|
||||
end
|
|
@ -1,13 +1,13 @@
|
|||
# A singleton that runs on a separate process than the web server.
|
||||
# Listens to *ALL* incoming logs and stores them to the DB.
|
||||
# Also handles throttling.
|
||||
class LogService
|
||||
class LogService < AbstractServiceRunner
|
||||
T = ThrottlePolicy::TimePeriod
|
||||
THROTTLE_POLICY = ThrottlePolicy.new T.new(1.minute) => 0.5 * 1_000,
|
||||
T.new(1.hour) => 0.5 * 10_000,
|
||||
T.new(1.day) => 0.5 * 100_000
|
||||
|
||||
def self.process(delivery_info, payload)
|
||||
def process(delivery_info, payload)
|
||||
params = { routing_key: delivery_info.routing_key, payload: payload }
|
||||
m = AmqpLogParser.run!(params)
|
||||
puts "#{m.device_id}: #{m.payload["message"]}" if Rails.env.production?
|
||||
|
@ -15,7 +15,7 @@ class LogService
|
|||
maybe_deliver(m)
|
||||
end
|
||||
|
||||
def self.maybe_deliver(data)
|
||||
def maybe_deliver(data)
|
||||
violation = THROTTLE_POLICY.is_throttled(data.device_id)
|
||||
ok = data.valid? && !violation
|
||||
|
||||
|
@ -24,13 +24,13 @@ class LogService
|
|||
end
|
||||
end
|
||||
|
||||
def self.deliver(data)
|
||||
def deliver(data)
|
||||
dev, log = [data.device, data.payload]
|
||||
dev.maybe_unthrottle
|
||||
Log.deliver(dev, Logs::Create.run!(log, device: dev))
|
||||
end
|
||||
|
||||
def self.warn_user(data, violation)
|
||||
def warn_user(data, violation)
|
||||
data.device.maybe_throttle(violation)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,31 +1,30 @@
|
|||
module Resources
|
||||
MQTT_CHAN = "from_api"
|
||||
CHANNEL_TPL =
|
||||
MQTT_CHAN = "from_api"
|
||||
CHANNEL_TPL =
|
||||
"bot.device_%{device_id}.resources_v0.%{action}.%{klass}.%{uuid}.%{id}"
|
||||
INDEX_OF_USERNAME = 1
|
||||
INDEX_OF_OP = 3
|
||||
INDEX_OF_KIND = 4
|
||||
INDEX_OF_UUID = 5
|
||||
INDEX_OF_ID = 6
|
||||
INDEX_OF_OP = 3
|
||||
INDEX_OF_KIND = 4
|
||||
INDEX_OF_UUID = 5
|
||||
INDEX_OF_ID = 6
|
||||
|
||||
class Service
|
||||
|
||||
def self.ok(uuid)
|
||||
class Service < AbstractServiceRunner
|
||||
def ok(uuid)
|
||||
{ kind: "rpc_ok", args: { label: uuid } }.to_json
|
||||
end
|
||||
|
||||
def self.rpc_err(uuid, error)
|
||||
def rpc_err(uuid, error)
|
||||
{
|
||||
kind: "rpc_error",
|
||||
args: { label: uuid },
|
||||
body: (error
|
||||
.errors
|
||||
.values
|
||||
.map { |err| { kind: "explanation", args: { message: err.message }} })
|
||||
.errors
|
||||
.values
|
||||
.map { |err| { kind: "explanation", args: { message: err.message } } }),
|
||||
}.to_json
|
||||
end
|
||||
|
||||
def self.step1(delivery_info, body) # Returns params or nil
|
||||
def step1(delivery_info, body) # Returns params or nil
|
||||
Preprocessor.from_amqp(delivery_info, body)
|
||||
rescue Mutations::ValidationException => q
|
||||
# AUTHORS NOTE: Some of the Bunny data structures have circular
|
||||
|
@ -36,26 +35,26 @@ module Resources
|
|||
x = delivery_info.to_h.slice(*safe_attrs).merge(body: body)
|
||||
Rollbar.error(q, x)
|
||||
raw_chan = delivery_info&.routing_key&.split(".") || []
|
||||
id = raw_chan[INDEX_OF_USERNAME]&.gsub("device_", "")&.to_i
|
||||
uuid = raw_chan[INDEX_OF_UUID] || "NONE"
|
||||
id = raw_chan[INDEX_OF_USERNAME]&.gsub("device_", "")&.to_i
|
||||
uuid = raw_chan[INDEX_OF_UUID] || "NONE"
|
||||
Transport.current.amqp_send(rpc_err(uuid, q), id, MQTT_CHAN) if id
|
||||
nil
|
||||
end
|
||||
|
||||
def self.step2(params)
|
||||
dev = params[:device]
|
||||
def step2(params)
|
||||
dev = params[:device]
|
||||
|
||||
dev.auto_sync_transaction do
|
||||
Job.run!(params)
|
||||
uuid = (params[:uuid] || "NONE")
|
||||
uuid = (params[:uuid] || "NONE")
|
||||
Transport.current.amqp_send(ok(uuid), dev.id, MQTT_CHAN)
|
||||
end
|
||||
rescue Mutations::ValidationException => q
|
||||
device = params.fetch(:device)
|
||||
Rollbar.info("device_#{device.id} using AMQP resource mgmt")
|
||||
uuid = params.fetch(:uuid)
|
||||
uuid = params.fetch(:uuid)
|
||||
errors = q.errors.values.map do |err|
|
||||
{ kind: "explanation", args: { message: err.message }}
|
||||
{ kind: "explanation", args: { message: err.message } }
|
||||
end
|
||||
message = { kind: "rpc_error",
|
||||
args: { label: uuid },
|
||||
|
@ -63,7 +62,7 @@ module Resources
|
|||
Transport.current.amqp_send(message, device.id, MQTT_CHAN)
|
||||
end
|
||||
|
||||
def self.process(delivery_info, body)
|
||||
def process(delivery_info, body)
|
||||
params = step1(delivery_info, body)
|
||||
params && step2(params)
|
||||
end
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
class ServiceRunner
|
||||
WAIT_TIME = Rails.env.test? ? 0.01 : 5
|
||||
OFFLINE_ERROR = Bunny::TCPConnectionFailedForAllHosts
|
||||
CRASH_MSG = Rails.env.test? ?
|
||||
"\e[32m.\e[0m" : "Something caused the broker to crash...\n"
|
||||
|
||||
def self.go!(channel, worker_klass)
|
||||
self.new(channel, worker_klass).run!
|
||||
end
|
||||
|
||||
def initialize(channel, worker_klass)
|
||||
@channel = channel
|
||||
@worker = worker_klass
|
||||
end
|
||||
|
||||
def run!
|
||||
@channel.subscribe(block: true) do |info, _, payl|
|
||||
@worker.process(info, payl.force_encoding("UTF-8"))
|
||||
end
|
||||
rescue OFFLINE_ERROR, StandardError => e
|
||||
unless e.is_a?(OFFLINE_ERROR)
|
||||
Rollbar.error(e)
|
||||
print CRASH_MSG
|
||||
end
|
||||
sleep WAIT_TIME
|
||||
retry
|
||||
end
|
||||
end
|
|
@ -3,12 +3,12 @@ require "bunny"
|
|||
# A wrapper around AMQP to stay DRY. Will make life easier if we ever need to
|
||||
# change protocols
|
||||
class Transport
|
||||
OPTS = { read_timeout: 10, heartbeat: 10, log_level: "info" }
|
||||
OPTS = { read_timeout: 10, heartbeat: 10, log_level: "warn" }
|
||||
RESOURCE_ROUTING_KEY = "bot.*.resources_v0.*.*.*.*"
|
||||
|
||||
def self.amqp_url
|
||||
@amqp_url ||= ENV['CLOUDAMQP_URL'] ||
|
||||
ENV['RABBITMQ_URL'] ||
|
||||
@amqp_url ||= ENV["CLOUDAMQP_URL"] ||
|
||||
ENV["RABBITMQ_URL"] ||
|
||||
"amqp://admin:#{ENV.fetch("ADMIN_PASSWORD")}@mqtt:5672"
|
||||
end
|
||||
|
||||
|
@ -32,7 +32,7 @@ class Transport
|
|||
|
||||
def connection
|
||||
@connection ||= Transport
|
||||
.default_amqp_adapter.new(Transport.amqp_url, OPTS).start
|
||||
.default_amqp_adapter.new(Transport.amqp_url, OPTS).start
|
||||
end
|
||||
|
||||
def log_channel
|
||||
|
@ -44,10 +44,10 @@ class Transport
|
|||
|
||||
def resource_channel
|
||||
@resource_channel ||= self
|
||||
.connection
|
||||
.create_channel
|
||||
.queue("resource_workers")
|
||||
.bind("amq.topic", routing_key: RESOURCE_ROUTING_KEY)
|
||||
.connection
|
||||
.create_channel
|
||||
.queue("resource_workers")
|
||||
.bind("amq.topic", routing_key: RESOURCE_ROUTING_KEY)
|
||||
end
|
||||
|
||||
# def ping_channel
|
||||
|
@ -59,9 +59,9 @@ class Transport
|
|||
|
||||
def amqp_topic
|
||||
@amqp_topic ||= self
|
||||
.connection
|
||||
.create_channel
|
||||
.topic("amq.topic", auto_delete: true)
|
||||
.connection
|
||||
.create_channel
|
||||
.topic("amq.topic", auto_delete: true)
|
||||
end
|
||||
|
||||
def amqp_send(message, id, channel)
|
||||
|
@ -97,10 +97,10 @@ class Transport
|
|||
end
|
||||
|
||||
def self.api_url
|
||||
uri = URI(Transport.amqp_url)
|
||||
uri = URI(Transport.amqp_url)
|
||||
uri.scheme = ENV["FORCE_SSL"] ? "https" : "http"
|
||||
uri.user = nil
|
||||
uri.port = 15672
|
||||
uri.user = nil
|
||||
uri.port = 15672
|
||||
uri.to_s
|
||||
end
|
||||
|
||||
|
|
|
@ -6,59 +6,32 @@ require_relative "../app/lib/resources.rb"
|
|||
require_relative "../app/lib/resources/job.rb"
|
||||
require_relative "../app/lib/resources/preprocessor.rb"
|
||||
require_relative "../app/lib/resources/service.rb"
|
||||
require_relative "../app/lib/service_runner_base.rb"
|
||||
require_relative "../app/lib/service_runner_base.rb"
|
||||
|
||||
class RabbitWorker
|
||||
# You migiht need this to debug ping stuff RC:
|
||||
# class FakePing
|
||||
# def self.process(info, payl)
|
||||
# puts "====================================="
|
||||
# arry = info.routing_key.split(".")
|
||||
# arry[2] = "pong"
|
||||
# chan = arry.join(".")
|
||||
# puts chan
|
||||
# Transport.current.raw_amqp_send(arry[3], chan)
|
||||
# puts "====================================="
|
||||
# end
|
||||
# end
|
||||
|
||||
WAIT = 3
|
||||
SERVICES = {
|
||||
log_channel: LogService,
|
||||
resource_channel: Resources::Service,
|
||||
# ping_channel: FakePing
|
||||
}
|
||||
|
||||
def run_it!(chan, service)
|
||||
puts " Attempting to connect #{service} to #{chan}"
|
||||
ServiceRunner.go!(Transport.current.send(chan), service)
|
||||
rescue
|
||||
puts "Connecting to broker in #{WAIT} seconds."
|
||||
sleep WAIT
|
||||
retry
|
||||
end
|
||||
|
||||
def thread(channel, service)
|
||||
Thread.new { run_it!(channel, service) }
|
||||
end
|
||||
|
||||
def threads
|
||||
@threads ||= SERVICES.map { |(c,s)| thread(c, s) }
|
||||
WAIT = 3
|
||||
def self.thread
|
||||
Thread.new do
|
||||
yield
|
||||
rescue => e
|
||||
puts "Connecting to broker in #{WAIT} seconds. (#{e.inspect})"
|
||||
sleep WAIT
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
def self.go!
|
||||
loop do
|
||||
ThreadsWait.all_waits(self.new.threads)
|
||||
ThreadsWait.all_waits([
|
||||
thread { LogService.new.go!(Transport.current.log_channel) },
|
||||
thread { Resources::Service.new.go!(Transport.current.resource_channel) },
|
||||
])
|
||||
end
|
||||
rescue
|
||||
sleep RabbitWorker::WAIT
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
sleep(RabbitWorker::WAIT * 2)
|
||||
|
||||
begin
|
||||
RabbitWorker.go!
|
||||
rescue
|
||||
sleep RabbitWorker::WAIT
|
||||
retry
|
||||
end
|
||||
RabbitWorker.go!
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
require "spec_helper"
|
||||
require "service_runner_base"
|
||||
|
||||
describe ServiceRunner do
|
||||
class ServiceRunnerStub
|
||||
describe AbstractServiceRunner do
|
||||
class ServiceRunnerStub < AbstractServiceRunner
|
||||
attr_accessor :subscribe_call_count, :process_calls
|
||||
|
||||
MSG = RuntimeError.new("First attempt will fail, expect a retry")
|
||||
|
||||
def initialize
|
||||
@subscribe_call_count = 0
|
||||
@process_calls = []
|
||||
@process_calls = []
|
||||
end
|
||||
|
||||
def subscribe(*)
|
Loading…
Reference in New Issue