Ability to change `Transport` AMQP adapter at runtime.
parent
923794fcbd
commit
4bd60324b7
|
@ -6,12 +6,27 @@ class Transport
|
|||
AMQP_URL = ENV['CLOUDAMQP_URL'] || ENV['RABBITMQ_URL'] || LOCAL
|
||||
OPTS = { read_timeout: 10, heartbeat: 10, log_level: 'info' }
|
||||
|
||||
def self.default_amqp_adapter=(value)
|
||||
@default_amqp_adapter = value
|
||||
end
|
||||
|
||||
def self.default_amqp_adapter
|
||||
@default_amqp_adapter ||= Bunny
|
||||
end
|
||||
|
||||
attr_accessor :amqp_adapter, :request_store
|
||||
|
||||
def initialize(opts = {})
|
||||
@amqp_adapter = opts.fetch :amqp_adapter, Transport.default_amqp_adapter
|
||||
@request_store = opts.fetch :request_store, RequestStore.store
|
||||
end
|
||||
|
||||
def self.current
|
||||
@current ||= self.new
|
||||
end
|
||||
|
||||
def connection
|
||||
@connection ||= Bunny.new(AMQP_URL, OPTS).start
|
||||
@connection ||= @amqp_adapter.new(AMQP_URL, OPTS).start
|
||||
end
|
||||
|
||||
def log_channel
|
||||
|
@ -35,10 +50,10 @@ class Transport
|
|||
# We need to hoist the Rack X-Farmbot-Rpc-Id to a global state so that it can
|
||||
# be used as a unique identifier for AMQP messages.
|
||||
def current_request_id
|
||||
RequestStore.store[:current_request_id] || "NONE"
|
||||
request_store[:current_request_id] || "NONE"
|
||||
end
|
||||
|
||||
def set_current_request_id(uuid)
|
||||
RequestStore.store[:current_request_id] = uuid
|
||||
request_store[:current_request_id] = uuid
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,6 +3,7 @@ require_relative "./log_service_support"
|
|||
begin
|
||||
# Listen to all logs on the message broker and store them in the database.
|
||||
Transport
|
||||
.current
|
||||
.log_channel
|
||||
.subscribe(block: true) do |info, _, payl|
|
||||
LogService.process(info, payl)
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
describe Transport do
|
||||
it "does things"
|
||||
end
|
|
@ -21,6 +21,29 @@ require "rspec/rails"
|
|||
require_relative "./stuff"
|
||||
require_relative "./fake_sequence"
|
||||
|
||||
class FakeBunny
|
||||
def initialize(*)
|
||||
end
|
||||
|
||||
def start(*)
|
||||
self
|
||||
end
|
||||
|
||||
def create_channel(*)
|
||||
self
|
||||
end
|
||||
|
||||
def topic(*)
|
||||
self
|
||||
end
|
||||
|
||||
def publish(*)
|
||||
self
|
||||
end
|
||||
end
|
||||
|
||||
Transport.default_amqp_adapter = FakeBunny
|
||||
|
||||
Dir[Rails.root.join("spec/support/**/*.rb")].each { |f| require f }
|
||||
|
||||
SmarfDoc.config do |c|
|
||||
|
|
Loading…
Reference in New Issue