[WIP][UNSTABLE] Resource service ONLY returns CeleryScript - let auto_sync deal with resources.
parent
a63358383c
commit
76b966b59a
|
@ -1,32 +1,63 @@
|
|||
module Resources
|
||||
class Service
|
||||
def self.process(delivery_info, body)
|
||||
params = PreProcessor.from_amqp(delivery_info, body)
|
||||
puts params if Rails.env.production?
|
||||
result = Job.run!(params)
|
||||
payl = result ? result.to_json : ""
|
||||
chan = ["from_api", (params[:uuid] || "NONE")].join(".")
|
||||
params[:device].auto_sync_transaction do
|
||||
Transport.current.amqp_send(payl, params[:device].id, chan)
|
||||
end
|
||||
def self.ok(uuid)
|
||||
{ kind: "rpc_ok", args: { label: uuid } }.to_json
|
||||
end
|
||||
|
||||
def self.rpc_err(uuid, error)
|
||||
{
|
||||
kind: "rpc_error",
|
||||
args: { label: uuid },
|
||||
body: (error
|
||||
.errors
|
||||
.values
|
||||
.map { |err| { kind: "explanation", args: { message: err.message }} })
|
||||
}.to_json
|
||||
end
|
||||
|
||||
def self.step1(delivery_info, body) # Returns params or nil
|
||||
PreProcessor.from_amqp(delivery_info, body)
|
||||
rescue Mutations::ValidationException => q
|
||||
Rollbar.error(q)
|
||||
raw_chan = delivery_info&.routing_key&.split(".") || []
|
||||
id = raw_chan[1]&.gsub("device_", "")&.to_i
|
||||
uuid = (raw_chan.last || "NONE")
|
||||
chan = ["from_api", uuid].join(".")
|
||||
Transport.current.amqp_send(rpc_err(uuid, q), id, chan) if id
|
||||
nil
|
||||
end
|
||||
|
||||
def self.step2(params)
|
||||
puts params if Rails.env.production?
|
||||
Job.run!(params)
|
||||
uuid = (params[:uuid] || "NONE")
|
||||
chan = ["from_api", uuid].join(".")
|
||||
dev = params[:device]
|
||||
|
||||
dev.auto_sync_transaction do
|
||||
Transport.current.amqp_send(ok(uuid), dev.id, chan)
|
||||
end
|
||||
rescue ArgumentError => x
|
||||
binding.pry
|
||||
rescue Mutations::ValidationException => q
|
||||
Rollbar.error(q)
|
||||
params ||= {}
|
||||
raw_chan = delivery_info&.routing_key&.split(".") || []
|
||||
device = params[:device]
|
||||
device_id = device ? device.id : raw_chan[1]&.gsub("device_", "")&.to_i
|
||||
if device_id
|
||||
message = {
|
||||
kind: "rpc_error",
|
||||
args: { label: params[:uuid] || raw_chan[6] || "NONE" },
|
||||
body: (q
|
||||
.errors
|
||||
.values
|
||||
.map { |err| { kind: "explanation", args: { message: err.message }} })
|
||||
}.to_json
|
||||
chan = ["from_api", (raw_chan.last || "")].join(".")
|
||||
Transport.current.amqp_send(message, device_id, chan)
|
||||
end
|
||||
message = {
|
||||
kind: "rpc_error",
|
||||
args: { label: params[:uuid] || raw_chan[6] || "NONE" },
|
||||
body: (q
|
||||
.errors
|
||||
.values
|
||||
.map { |err| { kind: "explanation", args: { message: err.message }} })
|
||||
}.to_json
|
||||
chan = ["from_api", (raw_chan.last || "")].join(".")
|
||||
Transport.current.amqp_send(message, device.id, chan)
|
||||
end
|
||||
|
||||
def self.process(delivery_info, body)
|
||||
params = step1(delivery_info, body)
|
||||
params && step2(params)
|
||||
end
|
||||
end # Service
|
||||
end # Resources
|
||||
|
|
Loading…
Reference in New Issue