Merge branch 'garden_snapshot_ii' into data_dump

pull/815/head
Rick Carlino 2018-04-28 10:13:46 -05:00
commit caacc36f04
23 changed files with 189 additions and 129 deletions

View File

@ -90,7 +90,7 @@ private
response.headers[REQ_ID] = id
# # IMPORTANT: We need to hoist X-Farmbot-Rpc-Id to a global so that it is
# # accessible for use with auto_sync.
Transport.set_current_request_id(response.headers[REQ_ID])
Transport.current.set_current_request_id(response.headers[REQ_ID])
end
# Disable cookies. This is an API!

View File

@ -9,7 +9,9 @@ module Api
end
def update
mutate SavedGardens::Update.run(raw_json, saved_garden: garden, device: current_device)
mutate SavedGardens::Update.run(raw_json,
saved_garden: garden,
device: current_device)
end
def destroy
@ -21,8 +23,8 @@ module Api
end
def apply
params = { garden: garden,
device: current_device,
params = { garden: garden,
device: current_device,
destructive: (request.method == "POST") }
mutate SavedGardens::Apply.run(params)
end

View File

@ -5,6 +5,6 @@ class AutoSyncJob < ApplicationJob
wayback = Time.at(created_at_utc_integer).utc
mins = ((wayback - Time.now.utc) / 1.minute).round
Transport.amqp_send(broadcast_payload, id, channel) if (mins < 2)
Transport.current.amqp_send(broadcast_payload, id, channel) if (mins < 2)
end
end

View File

@ -9,6 +9,6 @@ class SendFactoryResetJob < ApplicationJob
def perform(device, transport = Transport)
payl = SendFactoryResetJob.rpc_payload(device)
transport.amqp_send(payl.to_json, device.id, "from_clients")
transport.current.amqp_send(payl.to_json, device.id, "from_clients")
end
end

View File

@ -46,7 +46,7 @@ class ApplicationRecord < ActiveRecord::Base
end
def broadcast_payload
{ args: { label: Transport.current_request_id }, body: body_as_json }.to_json
{ args: { label: Transport.current.current_request_id }, body: body_as_json }.to_json
end
# Overridable

View File

@ -23,6 +23,8 @@ class Device < ApplicationRecord
has_many :token_issuances, dependent: :destroy
has_many :tools, dependent: :destroy
has_many :webcam_feeds, dependent: :destroy
has_many :in_use_tools
has_many :in_use_points
has_many :users
validates_presence_of :name
@ -65,7 +67,7 @@ class Device < ApplicationRecord
end
# Send a realtime message to a logged in user.
def tell(message, transport = Transport)
def tell(message)
log = Log.new({ device: self,
message: message,
created_at: Time.now,
@ -73,7 +75,7 @@ class Device < ApplicationRecord
meta: { type: "info" } })
json = LogSerializer.new(log).as_json.to_json
transport.amqp_send(json, self.id, "logs")
Transport.current.amqp_send(json, self.id, "logs")
log
end

View File

@ -1,6 +1,8 @@
# THIS IS A SQL VIEW. IT IS NOT A REAL TABLE.
# Maps Point <==> Sequence
class InUsePoint < ApplicationRecord
belongs_to :device
DEFAULT_NAME = "point"
FANCY_NAMES = {
GenericPointer.name => DEFAULT_NAME,
@ -15,5 +17,4 @@ class InUsePoint < ApplicationRecord
def fancy_name
"#{FANCY_NAMES[pointer_type] || DEFAULT_NAME} at (#{x}, #{y}, #{z})"
end
end

View File

@ -1,6 +1,8 @@
# THIS IS A SQL VIEW. IT IS NOT A REAL TABLE.
# Maps Tool <==> Sequence
class InUseTool < ApplicationRecord
belongs_to :device
def readonly?
true
end

View File

@ -1,40 +1,58 @@
require "bunny"
# A wrapper around AMQP to stay DRY. Will make life easier if we ever need to
# change protocols
module Transport
class Transport
LOCAL = "amqp://guest:guest@localhost:5672"
AMQP_URL = ENV['CLOUDAMQP_URL'] || ENV['RABBITMQ_URL'] || LOCAL
OPTS = { read_timeout: 10, heartbeat: 10, log_level: 'info' }
def self.connection
@connection ||= Bunny.new(AMQP_URL, OPTS).start
def self.default_amqp_adapter=(value)
@default_amqp_adapter = value
end
def self.log_channel
def self.default_amqp_adapter
@default_amqp_adapter ||= Bunny
end
attr_accessor :amqp_adapter, :request_store
def self.current
@current ||= self.new
end
def self.current=(value)
@current = value
end
def connection
@connection ||= Transport.default_amqp_adapter.new(AMQP_URL, OPTS).start
end
def log_channel
@log_channel ||= self.connection
.create_channel
.queue("", exclusive: true)
.bind("amq.topic", routing_key: "bot.*.logs")
end
def self.topic
@topic ||= self
def amqp_topic
@amqp_topic ||= self
.connection
.create_channel
.topic("amq.topic", auto_delete: true)
end
def self.amqp_send(message, id, channel)
topic.publish(message, routing_key: "bot.device_#{id}.#{channel}")
def amqp_send(message, id, channel)
amqp_topic.publish(message, routing_key: "bot.device_#{id}.#{channel}")
end
# We need to hoist the Rack X-Farmbot-Rpc-Id to a global state so that it can
# be used as a unique identifier for AMQP messages.
def self.current_request_id
def current_request_id
RequestStore.store[:current_request_id] || "NONE"
end
def self.set_current_request_id(uuid)
def set_current_request_id(uuid)
RequestStore.store[:current_request_id] = uuid
end
end

View File

@ -1,11 +1,17 @@
module SavedGardens
class Apply < Mutations::Command
DEP_ERROR_REPORT = \
"Unable to remove the following plants from the garden: %s"
required do
model :device, class: Device
model :garden, class: SavedGarden
boolean :destructive # Not yet implemented. RC 4/20/18
end
def validate
plant_safety_check if destructive
end
def execute
clean_out_plants if destructive
convert_templates_to_plants
@ -32,8 +38,22 @@ module SavedGardens
end
def clean_out_plants
Points::Destroy.run!(device: device,
point_ids: device.plants.pluck(:id))
Points::Destroy.run!(device: device, point_ids: device.plants.pluck(:id))
end
def plant_safety_check
add_error :whoops,
:whoops,
plant_error_message if in_use_plants.count > 0
end
def plant_error_message
@plant_error_message ||= \
DEP_ERROR_REPORT % in_use_plants.map(&:fancy_name).join(", ")
end
def in_use_plants
@in_use_plants ||= device.in_use_points.where(pointer_type: "Plant")
end
end
end

View File

@ -41,7 +41,7 @@ FarmBot::Application.routes.draw do
end
resources :saved_gardens, except: [ :show ] do
post :snapshot, on: :collection
post :snapshot, on: :collection
post :apply, on: :member
patch :apply, on: :member
end

View File

@ -0,0 +1,5 @@
class UpdateInUsePointsToVersion2 < ActiveRecord::Migration[5.1]
def change
update_view :in_use_points, version: 2, revert_to_version: 1
end
end

View File

@ -530,6 +530,20 @@ ActiveRecord::Schema.define(version: 20180423202520) do
WHERE ((edge_nodes.kind)::text = 'tool_id'::text);
SQL
create_view "sequence_usage_reports", sql_definition: <<-SQL
SELECT sequences.id AS sequence_id,
( SELECT count(*) AS count
FROM edge_nodes
WHERE (((edge_nodes.kind)::text = 'sequence_id'::text) AND ((edge_nodes.value)::integer = sequences.id))) AS edge_node_count,
( SELECT count(*) AS count
FROM farm_events
WHERE ((farm_events.executable_id = sequences.id) AND ((farm_events.executable_type)::text = 'Sequence'::text))) AS farm_event_count,
( SELECT count(*) AS count
FROM regimen_items
WHERE (regimen_items.sequence_id = sequences.id)) AS regimen_items_count
FROM sequences;
SQL
create_view "in_use_points", sql_definition: <<-SQL
SELECT points.x,
points.y,
@ -547,18 +561,4 @@ ActiveRecord::Schema.define(version: 20180423202520) do
WHERE ((edge_nodes.kind)::text = 'pointer_id'::text);
SQL
create_view "sequence_usage_reports", sql_definition: <<-SQL
SELECT sequences.id AS sequence_id,
( SELECT count(*) AS count
FROM edge_nodes
WHERE (((edge_nodes.kind)::text = 'sequence_id'::text) AND ((edge_nodes.value)::integer = sequences.id))) AS edge_node_count,
( SELECT count(*) AS count
FROM farm_events
WHERE ((farm_events.executable_id = sequences.id) AND ((farm_events.executable_type)::text = 'Sequence'::text))) AS farm_event_count,
( SELECT count(*) AS count
FROM regimen_items
WHERE (regimen_items.sequence_id = sequences.id)) AS regimen_items_count
FROM sequences;
SQL
end

View File

@ -0,0 +1,15 @@
SELECT
points.x as x,
points.y as y,
points.z as z,
sequences.id as sequence_id,
edge_nodes.id as edge_node_id,
points.device_id as device_id,
(edge_nodes.value)::int as point_id,
points.pointer_type as pointer_type,
points.name as pointer_name,
sequences.name as sequence_name
FROM "edge_nodes"
INNER JOIN "sequences" ON edge_nodes.sequence_id=sequences.id
INNER JOIN "points" ON (edge_nodes.value)::int=points.id
WHERE "edge_nodes"."kind" = 'pointer_id';

View File

@ -25,7 +25,7 @@ def ping(interval = 0)
$count += 1
puts "Log ##{$count}"
$log[:message] = "Hey! #{$count}"
Transport.amqp_send($log.to_json, $device_id, "logs")
Transport.current.amqp_send($log.to_json, $device_id, "logs")
end
loop do

View File

@ -3,11 +3,12 @@ require_relative "./log_service_support"
begin
# Listen to all logs on the message broker and store them in the database.
Transport
.current
.log_channel
.subscribe(block: true) do |info, _, payl|
LogService.process(info, payl)
end
rescue => Bunny::TCPConnectionFailedForAllHosts
rescue Bunny::TCPConnectionFailedForAllHosts => e
puts "MQTT Broker is unreachable. Waiting 5 seconds..."
sleep 5
retry

View File

@ -100,26 +100,6 @@ describe Api::LogsController do
expect(user.device.logs.count).to eq(0)
end
it "(PENDING) delivers emails for logs marked as `email`" do
pending "Something is not right with the queue adapter in test ENV 🤔"
sign_in user
empty_mail_bag
before_count = LogDispatch.count
body = { meta: { x: 1, y: 2, z: 3, type: "info" },
channels: ["email"],
message: "Heyoooo" }.to_json
run_jobs_now do
post :create, body: body, params: {format: :json}
after_count = LogDispatch.count
expect(response.status).to eq(200)
expect(last_email).to be
expect(last_email.body.to_s).to include("Heyoooo")
expect(last_email.to).to include(user.email)
expect(before_count).to be < after_count
expect(LogDispatch.where(sent_at: nil).count).to eq(0)
end
end
it "delivers emails for logs marked as `email`" do
LogDispatch.destroy_all
log = logs.first

View File

@ -86,31 +86,36 @@ describe Api::SavedGardensController do
expect(user.device.plants.count).to be > old_plant_count
end
it "prevents destructive application when plants in use."# do
# SavedGarden.destroy_all
# Plant.destroy_all
# PlantTemplate.destroy_all
# sign_in user
# saved_garden = FactoryBot.create(:saved_garden, device: user.device)
# FactoryBot.create_list(:plant_template, 3, device: user.device,
# saved_garden: saved_garden)
# FakeSequence.create(device: user.device,
# body: [{ kind: "move_absolute",
# args: {
# location: {
# kind: "point",
# args: { pointer_type: "Plant", pointer_id: plant.id }
# },
# speed: 100,
# offset: { kind: "", args: { } }
# }
# }])
it "prevents destructive application when plants in use." do
SavedGarden.destroy_all
Plant.destroy_all
PlantTemplate.destroy_all
sign_in user
saved_garden = FactoryBot.create(:saved_garden, device: user.device)
FactoryBot.create_list(:plant_template, 3, device: user.device,
saved_garden: saved_garden)
plant = FactoryBot.create(:plant, device: user.device)
FakeSequence.create(device: user.device,
body: [{ kind: "move_absolute",
args: {
location: {
kind: "point",
args: { pointer_type: "Plant", pointer_id: plant.id }
},
speed: 100,
offset: { kind: "coordinate", args: { x: 0, y: 0, z: 0 } }
}
}])
# old_plant_count = user.device.plants.count
# patch :apply, params: {id: saved_garden.id }
# expect(response.status).to be(200)
# expect(user.device.plants.count).to be > old_plant_count
# end
old_plant_count = user.device.plants.count
post :apply, params: {id: saved_garden.id }
expect(response.status).to be(422)
expect(user.device.plants.count).to eq(old_plant_count)
expect(json[:whoops])
.to include("Unable to remove the following plants from the garden")
expect(json[:whoops])
.to include("plant at (#{plant.x}, #{plant.y}, #{plant.z})")
end
it "performs 'destructive' garden application" do
SavedGarden.destroy_all

View File

@ -1,13 +0,0 @@
require 'spec_helper'
RSpec.describe SendFactoryResetJob, type: :job do
let(:device) { FactoryBot.create(:device) }
it 'sends a factory_reset RPC' do
dbl = double("fake transport layer")
payl = SendFactoryResetJob.rpc_payload(device)
expect(dbl)
.to receive(:amqp_send).with(payl.to_json, device.id, "from_clients")
SendFactoryResetJob.new.perform(device, dbl)
end
end

View File

@ -26,11 +26,12 @@ describe LogService do
end
it "calls .subscribe() on Transport." do
fakee = FakeLogChan.new
allow(Transport).to receive(:log_channel) { fakee }
expect(fakee.subcribe_calls).to eq(0)
Transport.current.clear!
load "lib/log_service.rb"
expect(fakee.subcribe_calls).to eq(1)
arg1 = Transport.current.connection.calls[:subscribe].last[0]
routing_key = Transport.current.connection.calls[:bind].last[1][:routing_key]
expect(arg1).to eq({block: true})
expect(routing_key).to eq("bot.*.logs")
end
it "creates new messages in the DB when called" do

View File

@ -26,10 +26,14 @@ describe Device do
expect([-5, -6, -7]).to include device.tz_offset_hrs # Remember DST!
end
it 'uses `tell` to send device messages' do
dbl = double("fake transport layer")
expect(dbl).to receive(:amqp_send)
result = device.tell("Hello!", dbl)
expect(result.message).to eq("Hello!")
it "sends specific users toast messages" do
Transport.current.clear!
hello = "Hello!"
log = device.tell(hello)
json, info = Transport.current.connection.calls[:publish].last
json = JSON.parse(json)
expect(info[:routing_key]).to eq("bot.device_#{device.id}.logs")
expect(log.message).to eq(hello)
expect(json["message"]).to eq(hello)
end
end

View File

@ -17,9 +17,47 @@ require "pry"
ENV["RAILS_ENV"] ||= "test"
require File.expand_path("../../config/environment", __FILE__)
# This is a stub for BunnyRB because we don't want the test suite to connect to
# AMQP for real.
class FakeTransport < Transport
# Theses are the "real" I/O inducing methods that must be Stubbed out.
MOCKED_METHODS = \
[ :bind, :publish, :queue, :subscribe, :create_channel, :topic ]
# When you call an AMQP I/O method, instead of doing real I/O, it will deposit
# the call into the @calls dictionary for observation.
attr_reader :calls
MOCKED_METHODS.map do |name|
# Eval is Evil, but this is pretty quick for testing.
eval """
def #{name}(*x)
key = #{name.inspect}
(@calls[key] ||= []).push(x)
@calls[key] = @calls[key].last(10)
self
end
"""
end
def initialize(*)
self.clear!
end
def start
self
end
def clear!
@calls = {}
end
end
Transport.default_amqp_adapter = FakeTransport
Transport.current = Transport.default_amqp_adapter.new
require "rspec/rails"
require_relative "./stuff"
require_relative "./topic_stub"
require_relative "./fake_sequence"
Dir[Rails.root.join("spec/support/**/*.rb")].each { |f| require f }
@ -41,8 +79,8 @@ RSpec.configure do |config|
require "capybara/rspec"
require "selenium/webdriver"
# Be sure to run `RAILS_ENV=test rails api:start` and `rails mqtt:start`!
Capybara.run_server = false
Capybara.app_host = "http://localhost:3000"
Capybara.run_server = false
Capybara.app_host = "http://localhost:3000"
Capybara.server_host = "localhost"
Capybara.server_port = "3000"
end

View File

@ -1,21 +0,0 @@
# Fake RabbitMQ adapter for when we test things.
class TopicStub
def self.publish(msg, opts)
end
end
class ChannelStub
def self.topic(name, opts)
return TopicStub
end
end
class MQTTStub
def self.create_channel
return ChannelStub
end
end
def Transport.connection
return MQTTStub
end