diff --git a/app/lib/abstract_service_runner.rb b/app/lib/abstract_service_runner.rb new file mode 100644 index 000000000..837c1ec78 --- /dev/null +++ b/app/lib/abstract_service_runner.rb @@ -0,0 +1,19 @@ +class AbstractServiceRunner + WAIT_TIME = Rails.env.test? ? 0.01 : 5 + OFFLINE_ERROR = Bunny::TCPConnectionFailedForAllHosts + CRASH_MSG = Rails.env.test? ? + "\e[32m.\e[0m" : "Something caused the broker to crash...\n" + + def go!(channel) + channel.subscribe(block: true) do |info, _, payl| + self.process(info, payl.force_encoding("UTF-8")) + end + rescue OFFLINE_ERROR, StandardError => e + unless e.is_a?(OFFLINE_ERROR) + Rollbar.error(e) + print CRASH_MSG + end + sleep WAIT_TIME + retry + end +end diff --git a/app/lib/log_service.rb b/app/lib/log_service.rb index 76a65cc5d..4c4fa238f 100644 --- a/app/lib/log_service.rb +++ b/app/lib/log_service.rb @@ -1,13 +1,13 @@ # A singleton that runs on a separate process than the web server. # Listens to *ALL* incoming logs and stores them to the DB. # Also handles throttling. -class LogService +class LogService < AbstractServiceRunner T = ThrottlePolicy::TimePeriod THROTTLE_POLICY = ThrottlePolicy.new T.new(1.minute) => 0.5 * 1_000, T.new(1.hour) => 0.5 * 10_000, T.new(1.day) => 0.5 * 100_000 - def self.process(delivery_info, payload) + def process(delivery_info, payload) params = { routing_key: delivery_info.routing_key, payload: payload } m = AmqpLogParser.run!(params) puts "#{m.device_id}: #{m.payload["message"]}" if Rails.env.production? @@ -15,7 +15,7 @@ class LogService maybe_deliver(m) end - def self.maybe_deliver(data) + def maybe_deliver(data) violation = THROTTLE_POLICY.is_throttled(data.device_id) ok = data.valid? && !violation @@ -24,13 +24,13 @@ class LogService end end - def self.deliver(data) + def deliver(data) dev, log = [data.device, data.payload] dev.maybe_unthrottle Log.deliver(dev, Logs::Create.run!(log, device: dev)) end - def self.warn_user(data, violation) + def warn_user(data, violation) data.device.maybe_throttle(violation) end end diff --git a/app/lib/resources/service.rb b/app/lib/resources/service.rb index f157aff0f..5bc9191d8 100644 --- a/app/lib/resources/service.rb +++ b/app/lib/resources/service.rb @@ -1,31 +1,30 @@ module Resources - MQTT_CHAN = "from_api" - CHANNEL_TPL = + MQTT_CHAN = "from_api" + CHANNEL_TPL = "bot.device_%{device_id}.resources_v0.%{action}.%{klass}.%{uuid}.%{id}" INDEX_OF_USERNAME = 1 - INDEX_OF_OP = 3 - INDEX_OF_KIND = 4 - INDEX_OF_UUID = 5 - INDEX_OF_ID = 6 + INDEX_OF_OP = 3 + INDEX_OF_KIND = 4 + INDEX_OF_UUID = 5 + INDEX_OF_ID = 6 - class Service - - def self.ok(uuid) + class Service < AbstractServiceRunner + def ok(uuid) { kind: "rpc_ok", args: { label: uuid } }.to_json end - def self.rpc_err(uuid, error) + def rpc_err(uuid, error) { kind: "rpc_error", args: { label: uuid }, body: (error - .errors - .values - .map { |err| { kind: "explanation", args: { message: err.message }} }) + .errors + .values + .map { |err| { kind: "explanation", args: { message: err.message } } }), }.to_json end - def self.step1(delivery_info, body) # Returns params or nil + def step1(delivery_info, body) # Returns params or nil Preprocessor.from_amqp(delivery_info, body) rescue Mutations::ValidationException => q # AUTHORS NOTE: Some of the Bunny data structures have circular @@ -36,26 +35,26 @@ module Resources x = delivery_info.to_h.slice(*safe_attrs).merge(body: body) Rollbar.error(q, x) raw_chan = delivery_info&.routing_key&.split(".") || [] - id = raw_chan[INDEX_OF_USERNAME]&.gsub("device_", "")&.to_i - uuid = raw_chan[INDEX_OF_UUID] || "NONE" + id = raw_chan[INDEX_OF_USERNAME]&.gsub("device_", "")&.to_i + uuid = raw_chan[INDEX_OF_UUID] || "NONE" Transport.current.amqp_send(rpc_err(uuid, q), id, MQTT_CHAN) if id nil end - def self.step2(params) - dev = params[:device] + def step2(params) + dev = params[:device] dev.auto_sync_transaction do Job.run!(params) - uuid = (params[:uuid] || "NONE") + uuid = (params[:uuid] || "NONE") Transport.current.amqp_send(ok(uuid), dev.id, MQTT_CHAN) end rescue Mutations::ValidationException => q device = params.fetch(:device) Rollbar.info("device_#{device.id} using AMQP resource mgmt") - uuid = params.fetch(:uuid) + uuid = params.fetch(:uuid) errors = q.errors.values.map do |err| - { kind: "explanation", args: { message: err.message }} + { kind: "explanation", args: { message: err.message } } end message = { kind: "rpc_error", args: { label: uuid }, @@ -63,7 +62,7 @@ module Resources Transport.current.amqp_send(message, device.id, MQTT_CHAN) end - def self.process(delivery_info, body) + def process(delivery_info, body) params = step1(delivery_info, body) params && step2(params) end diff --git a/app/lib/service_runner_base.rb b/app/lib/service_runner_base.rb deleted file mode 100644 index 5aeb890a1..000000000 --- a/app/lib/service_runner_base.rb +++ /dev/null @@ -1,28 +0,0 @@ -class ServiceRunner - WAIT_TIME = Rails.env.test? ? 0.01 : 5 - OFFLINE_ERROR = Bunny::TCPConnectionFailedForAllHosts - CRASH_MSG = Rails.env.test? ? - "\e[32m.\e[0m" : "Something caused the broker to crash...\n" - - def self.go!(channel, worker_klass) - self.new(channel, worker_klass).run! - end - - def initialize(channel, worker_klass) - @channel = channel - @worker = worker_klass - end - - def run! - @channel.subscribe(block: true) do |info, _, payl| - @worker.process(info, payl.force_encoding("UTF-8")) - end - rescue OFFLINE_ERROR, StandardError => e - unless e.is_a?(OFFLINE_ERROR) - Rollbar.error(e) - print CRASH_MSG - end - sleep WAIT_TIME - retry - end -end diff --git a/app/models/transport.rb b/app/models/transport.rb index d3f560825..9d126ec30 100644 --- a/app/models/transport.rb +++ b/app/models/transport.rb @@ -3,12 +3,12 @@ require "bunny" # A wrapper around AMQP to stay DRY. Will make life easier if we ever need to # change protocols class Transport - OPTS = { read_timeout: 10, heartbeat: 10, log_level: "info" } + OPTS = { read_timeout: 10, heartbeat: 10, log_level: "warn" } RESOURCE_ROUTING_KEY = "bot.*.resources_v0.*.*.*.*" def self.amqp_url - @amqp_url ||= ENV['CLOUDAMQP_URL'] || - ENV['RABBITMQ_URL'] || + @amqp_url ||= ENV["CLOUDAMQP_URL"] || + ENV["RABBITMQ_URL"] || "amqp://admin:#{ENV.fetch("ADMIN_PASSWORD")}@mqtt:5672" end @@ -32,7 +32,7 @@ class Transport def connection @connection ||= Transport - .default_amqp_adapter.new(Transport.amqp_url, OPTS).start + .default_amqp_adapter.new(Transport.amqp_url, OPTS).start end def log_channel @@ -44,10 +44,10 @@ class Transport def resource_channel @resource_channel ||= self - .connection - .create_channel - .queue("resource_workers") - .bind("amq.topic", routing_key: RESOURCE_ROUTING_KEY) + .connection + .create_channel + .queue("resource_workers") + .bind("amq.topic", routing_key: RESOURCE_ROUTING_KEY) end # def ping_channel @@ -59,9 +59,9 @@ class Transport def amqp_topic @amqp_topic ||= self - .connection - .create_channel - .topic("amq.topic", auto_delete: true) + .connection + .create_channel + .topic("amq.topic", auto_delete: true) end def amqp_send(message, id, channel) @@ -97,10 +97,10 @@ class Transport end def self.api_url - uri = URI(Transport.amqp_url) + uri = URI(Transport.amqp_url) uri.scheme = ENV["FORCE_SSL"] ? "https" : "http" - uri.user = nil - uri.port = 15672 + uri.user = nil + uri.port = 15672 uri.to_s end diff --git a/lib/rabbit_workers.rb b/lib/rabbit_workers.rb index 3fed24e4e..ea3860031 100644 --- a/lib/rabbit_workers.rb +++ b/lib/rabbit_workers.rb @@ -6,59 +6,32 @@ require_relative "../app/lib/resources.rb" require_relative "../app/lib/resources/job.rb" require_relative "../app/lib/resources/preprocessor.rb" require_relative "../app/lib/resources/service.rb" -require_relative "../app/lib/service_runner_base.rb" -require_relative "../app/lib/service_runner_base.rb" class RabbitWorker - # You migiht need this to debug ping stuff RC: - # class FakePing - # def self.process(info, payl) - # puts "=====================================" - # arry = info.routing_key.split(".") - # arry[2] = "pong" - # chan = arry.join(".") - # puts chan - # Transport.current.raw_amqp_send(arry[3], chan) - # puts "=====================================" - # end - # end - - WAIT = 3 - SERVICES = { - log_channel: LogService, - resource_channel: Resources::Service, - # ping_channel: FakePing - } - - def run_it!(chan, service) - puts " Attempting to connect #{service} to #{chan}" - ServiceRunner.go!(Transport.current.send(chan), service) - rescue - puts "Connecting to broker in #{WAIT} seconds." - sleep WAIT - retry - end - - def thread(channel, service) - Thread.new { run_it!(channel, service) } - end - - def threads - @threads ||= SERVICES.map { |(c,s)| thread(c, s) } + WAIT = 3 + def self.thread + Thread.new do + yield + rescue => e + puts "Connecting to broker in #{WAIT} seconds. (#{e.inspect})" + sleep WAIT + retry + end end def self.go! loop do - ThreadsWait.all_waits(self.new.threads) + ThreadsWait.all_waits([ + thread { LogService.new.go!(Transport.current.log_channel) }, + thread { Resources::Service.new.go!(Transport.current.resource_channel) }, + ]) end + rescue + sleep RabbitWorker::WAIT + retry end end sleep(RabbitWorker::WAIT * 2) -begin - RabbitWorker.go! -rescue - sleep RabbitWorker::WAIT - retry -end +RabbitWorker.go! diff --git a/spec/lib/service_runner_base_spec.rb b/spec/lib/abstract_service_runner_spec.rb similarity index 88% rename from spec/lib/service_runner_base_spec.rb rename to spec/lib/abstract_service_runner_spec.rb index 9488da12a..461d7cbf9 100644 --- a/spec/lib/service_runner_base_spec.rb +++ b/spec/lib/abstract_service_runner_spec.rb @@ -1,15 +1,14 @@ require "spec_helper" -require "service_runner_base" -describe ServiceRunner do - class ServiceRunnerStub +describe AbstractServiceRunner do + class ServiceRunnerStub < AbstractServiceRunner attr_accessor :subscribe_call_count, :process_calls MSG = RuntimeError.new("First attempt will fail, expect a retry") def initialize @subscribe_call_count = 0 - @process_calls = [] + @process_calls = [] end def subscribe(*)