Properly route inbound sync messages.

pull/527/head
Rick Carlino 2017-10-26 15:49:23 -05:00
parent aae6f1305b
commit 4eb8d2bace
6 changed files with 115 additions and 12 deletions

View File

@ -33,6 +33,7 @@ class ApplicationRecord < ActiveRecord::Base
end
def broadcast!
Thread.new { `play ~/tada.wav` }
Transport.send(broadcast_payload, Device.current.id, chan_name)
end
end

View File

@ -38,7 +38,6 @@
"@types/lodash": "^4.14.78",
"@types/markdown-it": "^0.0.4",
"@types/moxios": "^0.4.5",
"@types/mqtt": "^2.5.0",
"@types/node": "^8.0.46",
"@types/react": "15.0.39",
"@types/react-color": "^2.13.2",
@ -53,7 +52,7 @@
"enzyme": "^3.1.0",
"enzyme-adapter-react-15": "^1.0.2",
"extract-text-webpack-plugin": "^3.0.1",
"farmbot": "5.0.1",
"farmbot": "https://github.com/RickCarlino/farmbot-js.git",
"farmbot-toastr": "^1.0.3",
"fastclick": "^1.0.6",
"file-loader": "^1.1.5",

View File

@ -19,6 +19,7 @@ import { init } from "../api/crud";
import { versionOK } from "../devices/reducer";
import { AuthState } from "../auth/interfaces";
import { TaggedResource } from "../resources/tagged_resources";
import { TempDebug } from "./temp_debug";
export const TITLE = "New message from bot";
@ -147,6 +148,7 @@ const attachEventListeners =
bot.on("status", onStatus(dispatch, getState));
bot.on("malformed", onMalformed);
readStatus().then(changeLastClientConnected(bot), noop);
bot.client.on("message", TempDebug(dispatch, getState));
};
/** Connect to MQTT and attach all relevant event handlers. */

View File

@ -0,0 +1,94 @@
import { GetState } from "../redux/interfaces";
import { snakeCase } from "lodash";
import { getUuid } from "../resources/selectors";
import { ResourceName } from "../resources/tagged_resources";
interface UpdateMqttData {
status: "UPDATE"
kind: ResourceName;
id: number;
body: object;
}
interface DeleteMqttData {
status: "DELETE"
kind: ResourceName;
id: number;
}
interface BadMqttData {
status: "ERR";
reason: string;
}
interface SkipMqttData {
status: "SKIP";
}
type MqttDataResult =
| UpdateMqttData
| DeleteMqttData
| SkipMqttData
| BadMqttData;
enum Reason {
BAD_KIND = "missing `kind`",
BAD_ID = "No ID or invalid ID.",
BAD_CHAN = "Expected exactly 5 segments in channel"
}
function routeMqttData(chan: string, payload: Buffer): MqttDataResult {
/** Skip irrelevant messages */
if (!chan.includes("sync")) { return { status: "SKIP" }; }
/** Extract, Validate and scrub the data */
const parts = chan.split("/");
if (parts.length !== 5) { return { status: "ERR", reason: Reason.BAD_CHAN }; }
const id = parseInt(parts.pop() || "0", 10);
const kind = snakeCase(parts.pop()) as ResourceName | undefined;
const { body } = JSON.parse((payload).toString()) as { body: {} | null };
if (!kind) { return { status: "ERR", reason: Reason.BAD_KIND }; }
if (!id) { return { status: "ERR", reason: Reason.BAD_ID }; }
if (body) {
return { status: "UPDATE", body, kind: kind, id };
} else {
return { status: "DELETE", kind: kind, id }; // 'null' body means delete.
}
}
function handleCreate(data: UpdateMqttData) {
console.log("INSERT");
}
function handleUpdate(data: UpdateMqttData) {
console.log("UPDATE");
}
function handleErr(data: BadMqttData) {
console.log("ERROR", data);
}
function handleSkip() {
console.log("SKIP");
}
function handleDelete(data: DeleteMqttData) {
console.log("DELETE");
}
export const TempDebug =
(dispatch: Function, getState: GetState) =>
(chan: string, payload: Buffer) => {
const data = routeMqttData(chan, payload);
switch (data.status) {
case "UPDATE":
const uuid = getUuid(getState().resources.index, data.kind, data.id);
return (uuid ? handleCreate : handleUpdate)(data);
case "ERR": return handleErr(data);
case "SKIP": return handleSkip();
case "DELETE": return handleDelete(data);
}
};

View File

@ -31,10 +31,16 @@ import {
import { CowardlyDictionary, betterCompact, sortResourcesById } from "../util";
type StringMap = CowardlyDictionary<string>;
export let findId = (index: ResourceIndex, kind: ResourceName, id: number) => {
export let getUuid = (index: ResourceIndex, kind: ResourceName, id: number) => {
const uuid = index.byKindAndId[joinKindAndId(kind, id)];
assertUuid(kind, uuid);
if (uuid) {
assertUuid(kind, uuid);
return uuid;
}
};
export let findId = (index: ResourceIndex, kind: ResourceName, id: number) => {
const uuid = getUuid(index, kind, id);
if (uuid) {
return uuid;
} else {

View File

@ -109,12 +109,6 @@
dependencies:
axios "^0.16.1"
"@types/mqtt@^2.5.0":
version "2.5.0"
resolved "https://registry.yarnpkg.com/@types/mqtt/-/mqtt-2.5.0.tgz#bc54c2d53f509282168da4a9af865de95bee5101"
dependencies:
mqtt "*"
"@types/node@^6.0.46":
version "6.0.90"
resolved "https://registry.yarnpkg.com/@types/node/-/node-6.0.90.tgz#0ed74833fa1b73dcdb9409dcb1c97ec0a8b13b02"
@ -2009,6 +2003,13 @@ farmbot@5.0.1:
mqtt "^2.13.1"
typescript "^2.4.2"
"farmbot@https://github.com/RickCarlino/farmbot-js.git":
version "5.0.1"
resolved "https://github.com/RickCarlino/farmbot-js.git#d27f640d15cada92fa5d2cfa28957c0fe2530a3d"
dependencies:
mqtt "^2.13.1"
typescript "^2.4.2"
fast-deep-equal@^0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/fast-deep-equal/-/fast-deep-equal-0.1.0.tgz#5c6f4599aba6b333ee3342e2ed978672f1001f8d"
@ -3799,7 +3800,7 @@ mqtt-packet@^5.4.0:
process-nextick-args "^1.0.7"
safe-buffer "^5.1.0"
mqtt@*, mqtt@^2.13.1:
mqtt@^2.13.1:
version "2.13.1"
resolved "https://registry.yarnpkg.com/mqtt/-/mqtt-2.13.1.tgz#d83c7c5d9dc37864a363453f61fb5e1523c0144a"
dependencies: