From 36a3981f5b156e5ffa9fd2e66db609235aeb5b54 Mon Sep 17 00:00:00 2001 From: AdamSBlack Date: Mon, 3 Jan 2022 16:34:46 +0000 Subject: [PATCH] Athena restructure, websocket server for front end realtime sessions --- routes/api/devices.js | 36 +++++++ websocket/athena/helpers.js | 73 ++++++++++++++ websocket/athena/index.js | 190 ++++++++++++++++++++++++++++++++++++ websocket/index.js | 0 websocket/web/commands.js | 57 +++++++++++ websocket/web/controls.js | 40 ++++++++ websocket/web/index.js | 116 ++++++++++++++++++++++ 7 files changed, 512 insertions(+) create mode 100644 routes/api/devices.js create mode 100644 websocket/athena/helpers.js create mode 100644 websocket/athena/index.js create mode 100644 websocket/index.js create mode 100644 websocket/web/commands.js create mode 100644 websocket/web/controls.js create mode 100644 websocket/web/index.js diff --git a/routes/api/devices.js b/routes/api/devices.js new file mode 100644 index 0000000..735a933 --- /dev/null +++ b/routes/api/devices.js @@ -0,0 +1,36 @@ +const router = require('express').Router(); +const config = require('./../../config'); + +const userController = require('./../../controllers/users') +const deviceController = require('./../../controllers/devices') +const authenticationController = require('./../../controllers/authentication') + + + + + +// probs should put middleware somewhere else +router.use(async function (req, res, next) { + const account = await authenticationController.getAuthenticatedAccount(req, res); + + if (account === null) { + res.json({success: false, msg: 'NOT_AUTHENTICATED'}) + } else { + req.account = account; + next(); + } + + +}); + + +router.get('/retropilot/0/devices', async (req, res) => { + if (!req.account) {return res.json({success: false, msg: 'NOT_AUTHENTICATED'})}; + + const dongles = await deviceController.getDevices(req.account.id) + + res.json({success: true, data: dongles}) +}) + + +module.exports = router; \ No newline at end of file diff --git a/websocket/athena/helpers.js b/websocket/athena/helpers.js new file mode 100644 index 0000000..0fdce28 --- /dev/null +++ b/websocket/athena/helpers.js @@ -0,0 +1,73 @@ +let wss; +const orm = require('../../models/index.model'); +const {v4: uuid} = require('uuid'); +let realtime; + + +async function incoming(ws, res, msg) { + realtime.passData(ws.dongleId, msg) +} + + +async function deviceStatus(dongle_id, status) { + realtime.dongleStatus(dongle_id, status); +} + +function invoke(command, params, dongleId, accountId, id) { + const websocket = wss.retropilotFunc.findFromDongle(dongleId); + + if (!websocket) { + wss.retropilotFunc.actionLogger(accountId, null, "ATHENA_USER_INVOKE__FAILED_DISCONNECTED", null, null, null, dongleId); + return { connected: false } + } + + let uniqueID; + + if (!id) { + uniqueID = uuid(); + } else { + uniqueID = id; + } + + wss.retropilotFunc.actionLogger(accountId, websocket.device_id, "ATHENA_USER_INVOKE__ISSUED", null, websocket._socket.remoteAddress, JSON.stringify({ command, params, uniqueID }), websocket.dongleId); + + + orm.models.athena_returned_data.create({ + device_id: websocket.device_id, + type: command, + created_at: Date.now(), + uuid: uniqueID + }) + + + websocket.send(JSON.stringify(wss.retropilotFunc.commandBuilder(command, params, uniqueID))) + + return { dispatched: true, heartbeat: websocket.heartbeat, id: uniqueID } + +} + + +function isDeviceConnected(accountId, deviceId, dongleId) { + const websocket = wss.retropilotFunc.findFromDongle(dongleId); + wss.retropilotFunc.actionLogger(accountId, deviceId, "ATHENA_USER_STATUS__IS_CONNECTED", null, websocket ? websocket._socket.remoteAddress : null, JSON.stringify({ connected: websocket ? true : false, heartbeat: websocket ? websocket.heartbeat : null }), dongleId); + + if (!websocket) return { connected: false } + + return { connected: true, heartbeat: websocket.heartbeat }; +} + +async function realtimeCallback(callback) { + realtime = callback; +} + +module.exports = (websocketServer) => { + wss = websocketServer; + + return { + isDeviceConnected, + invoke, + incoming, + deviceStatus, + realtimeCallback + } +} diff --git a/websocket/athena/index.js b/websocket/athena/index.js new file mode 100644 index 0000000..44ecc72 --- /dev/null +++ b/websocket/athena/index.js @@ -0,0 +1,190 @@ +const WebSocket = require('ws'); +const fs = require('fs'); +const cookie = require('cookie') +const jsonwebtoken = require('jsonwebtoken'); +const models = require('./../../models/index.model') +const config = require('./../../config') +const httpsServer = require('https'); +const httpServer = require('http'); +const { readFileSync } = require('fs'); + +const authenticationController = require('./../../controllers/authentication'); +const deviceController = require('./../../controllers/devices'); +const { ws } = require('./../../routes/api/realtime'); +const log4js = require('log4js'); + +const logger = log4js.getLogger('default'); + + +let wss; +function __server() { + let server; + + if (config.athena.secure) { + server = httpsServer.createServer({ + cert: readFileSync(config.sslCrt), + key: readFileSync(config.sslKey) + }); + } else { + server = httpServer.createServer(); + } + + wss = new WebSocket.WebSocketServer({ server }, { path: '/ws/v2/', handshakeTimeout: 500 }); + + const interval = setInterval(() => { + wss.clients.forEach(function each(ws) { + if (ws.isAlive === false) { + logger.info(`Athena(Heartbeat) - Terminated ${ws.dongleId} - ${ws._socket.remoteAddress}`) + wss.retropilotFunc.actionLogger(null, null, "ATHENA_DEVICE_TIMEOUT_FORCE_DISCONNECT", null, ws._socket.remoteAddress, null, ws.dongleId); + if (ws.dongleId) {helpers.deviceStatus(ws.dongleId, false)} + + return ws.terminate(); + } + + ws.isAlive = false; + ws.ping(); + }); + }, config.athena.socket.heartbeatFrequency ? config.athena.socket.heartbeatFrequency : 5000); + + + server.listen(config.athena.socket.port, () => { + logger.info(`Athena(Server) - UP @ ${config.athena.host}:${config.athena.port}`) + }) + + + wss.on('connection', manageConnection) + wss.on('close', function close() { + logger.info(`Athena(Websocket) - DOWN`) + clearInterval(interval); + }); +} + +async function heartbeat() { + this.isAlive = true; + this.heartbeat = Date.now(); + if (this.dongleId) {helpers.deviceStatus(this.dongleId, true)} + +} + +async function manageConnection(ws, res) { + logger.info(`Athena(Websocket) - New Connection ${ws._socket.remoteAddress}`) + ws.badMessages = 0; + ws.isAlive = true; + ws.heartbeat = Date.now(); + ws.on('pong', heartbeat); + + + var cookies = cookie.parse(res.headers.cookie); + + ws.on('message', async function incoming(message) { + heartbeat.call(ws) + if (!ws.dongleId) { + wss.retropilotFunc.actionLogger(null, null, "ATHENA_DEVICE_UNATHENTICATED_MESSAGE", null, ws._socket.remoteAddress, JSON.stringify([message]), ws.dongleId); + console.log("unauthenticated message, discarded"); + return null; + } + + const json = JSON.parse(message.toString('utf8')) + console.log(json); + + console.log({device_id: ws.device_id, uuid: json.id}); + + console.log( await models.models.athena_returned_data.update({ + data: JSON.stringify(json), + resolved_at: Date.now() + }, {where: {device_id: ws.device_id, uuid: json.id}})) + + + wss.retropilotFunc.actionLogger(null, null, "ATHENA_DEVICE_MESSAGE_UNKNOWN", null, ws._socket.remoteAddress, JSON.stringify([message]), ws.dongleId); + + console.log(json) + + helpers.incoming(ws, res, json) + + + + + }); + + + if (await wss.retropilotFunc.authenticateDongle(ws, res, cookies) === false) { + ws.terminate(); + } + + + //ws.send(JSON.stringify(await commandBuilder('reboot'))) +} + + + + __server(); + +wss.retropilotFunc = { + + findFromDongle: (dongleId) => { + let websocket = null; + wss.clients.forEach((value) => { + if (value.dongleId === dongleId) { + websocket = value; + } + }) + + return websocket; + }, + + authenticateDongle: async (ws, res, cookies) => { + try { + unsafeJwt = jsonwebtoken.decode(cookies.jwt); + } catch(e) { + logger.info(`Athena(Websocket) - AUTHENTICATION FAILED (INVALID JWT) IP: ${ws._socket.remoteAddress}`) + wss.retropilotFunc.actionLogger(null, null, "ATHENA_DEVICE_AUTHENTICATE_INVALID", null, ws._socket.remoteAddress, JSON.stringify({ jwt: cookies.jwt }), null); + return false; + } + + + const device = await deviceController.getDeviceFromDongle(unsafeJwt.identity) + + let verifiedJWT; + + try { + verifiedJWT = jsonwebtoken.verify(cookies.jwt, device.public_key, { ignoreNotBefore: true }); + } catch (err) { + logger.info(`Athena(Websocket) - AUTHENTICATION FAILED (BAD JWT, CHECK SIGNATURE) IP: ${ws._socket.remoteAddress}`) + wss.retropilotFunc.actionLogger(null, null, "ATHENA_DEVICE_AUTHENTICATE_INVALID", null, ws._socket.remoteAddress, JSON.stringify({ jwt: cookies.jwt }), null); + return false; + } + + if (verifiedJWT.identify === unsafeJwt.identify) { + ws.dongleId = device.dongle_id + ws.device_id = device.id + wss.retropilotFunc.actionLogger(null, device.id, "ATHENA_DEVICE_AUTHENTICATE_SUCCESS", null, ws._socket.remoteAddress, null); + logger.info(`Athena(Websocket) - AUTHENTICATED IP: ${ws._socket.remoteAddress} DONGLE ID: ${ws.dongleId} DEVICE ID: ${ws.device_id}`) + return true; + } else { + wss.retropilotFunc.actionLogger(null, device.id, "ATHENA_DEVICE_AUTHENTICATE_FAILURE", null, ws._socket.remoteAddress, JSON.stringify({ jwt: cookies.jwt }), null); + logger.info(`Athena(Websocket) - AUTHENTICATION FAILED (BAD CREDENTIALS) IP: ${ws._socket.remoteAddress}`); + + return false; + } + }, + + commandBuilder: (method, params, id) => { + return { method, params: params, "jsonrpc": "2.0", "id": id } + }, + + actionLogger: async (account_id, device_id, action, user_ip, device_ip, meta, dongle_id) => { + models.models.athena_action_log.create({ + account_id, device_id, action, user_ip, device_ip, meta, created_at: Date.now(), dongle_id + }) + }, + + setRealtimeCallback: async (callback) => { + realtimeCallback = callback; + } + +} + +const helpers = require('./helpers')(wss) + + +module.exports = helpers; \ No newline at end of file diff --git a/websocket/index.js b/websocket/index.js new file mode 100644 index 0000000..e69de29 diff --git a/websocket/web/commands.js b/websocket/web/commands.js new file mode 100644 index 0000000..73025a7 --- /dev/null +++ b/websocket/web/commands.js @@ -0,0 +1,57 @@ + +const authenticationController = require('./../../controllers/authentication'); +const deviceController = require('./../../controllers/devices'); +const athenaRealtime = require('./../athena/index'); + + + +// Checks if device is currently online in Athena + +async function isDongleOnline(ws, msg) { + // Checking if the user is authorised to access dongle, this will be used later + // allowing users to delegate access. + const isAuthorised = await deviceController.isUserAuthorised(ws.account.id, msg.data.dongleId) + if (isAuthorised && isAuthorised.success === true) { + ws.send(JSON.stringify({ command: msg.command, success: true, id: msg.id || null, data: athenaRealtime.isDeviceConnected(ws.account.id, null, msg.data.dongleId) })) + } else { + ws.send(JSON.stringify({ command: msg.command, success: false, id: msg.id || null, msg: 'not_authorised' })) + } +} + + + +// Checks if device is currently online in Athena + +async function rebootDongle(ws, msg) { + // Checking if the user is authorised to access dongle, this will be used later + // allowing users to delegate access. + const isAuthorised = await deviceController.isUserAuthorised(ws.account.id, msg.data.dongleId) + console.log("is auth", isAuthorised) + if (isAuthorised && isAuthorised.success === true) { + await athenaRealtime.invoke("reboot", null, msg.data.dongleId, ws.account.id, msg.id || null) + ws.send(JSON.stringify({ command: msg.command, success: true, id: msg.id || null, data: {command_issued: true} })) + } else { + ws.send(JSON.stringify({ command: msg.command, success: false, id: msg.id || null, msg: 'not_authorised' })) + } +} + +async function takeSnapshot(ws, msg) { + const isAuthorised = await deviceController.isUserAuthorised(ws.account.id, msg.data.dongleId) + console.log("is auth", isAuthorised) + if (isAuthorised && isAuthorised.success === true) { + await athenaRealtime.invoke("takeSnapshot", null, msg.data.dongleId, ws.account.id, msg.id || null) + ws.send(JSON.stringify({ command: msg.command, success: true, id: msg.id || null, data: {command_issued: true} })) + } else { + ws.send(JSON.stringify({ command: msg.command, success: false, id: msg.id || null, msg: 'not_authorised' })) + } +} + + + + + +module.exports = { + isDongleOnline, + rebootDongle, + takeSnapshot +} \ No newline at end of file diff --git a/websocket/web/controls.js b/websocket/web/controls.js new file mode 100644 index 0000000..a681fd9 --- /dev/null +++ b/websocket/web/controls.js @@ -0,0 +1,40 @@ +const deviceController = require('./../../controllers/devices'); +let wss; + +async function getDongleOwners(dongle_id) { + const owners = await deviceController.getOwnersFromDongle(dongle_id); + console.log("dongle owners", owners) + return owners; +} + +async function broadcastToAccounts(owners , data) { + + wss.clients.forEach(function each(ws) { + owners.data.forEach((account_id) => { + if (account_id === ws.account.id) { + ws.send(JSON.stringify(data)) + } + }) + }) +} + +async function dongleStatus(dongle_id, status) { + const owners = await getDongleOwners(dongle_id) + await broadcastToAccounts(owners, {command: "dongle_status", id: Date.now(), data: {dongle_id: dongle_id, online: status, time: Date.now()}}) +} + +async function passData(dongle_id, msg) { + const owners = await getDongleOwners(dongle_id) + await broadcastToAccounts(owners, {command: "data_return", id: msg.id, data: {dongle_id: dongle_id, return: msg}}) + return true; +} + +module.exports = (websocket) => { + wss = websocket; + + return { + getDongleOwners, + dongleStatus, + passData + } +} \ No newline at end of file diff --git a/websocket/web/index.js b/websocket/web/index.js new file mode 100644 index 0000000..18641ca --- /dev/null +++ b/websocket/web/index.js @@ -0,0 +1,116 @@ +const WebSocket = require('ws'); +const fs = require('fs'); +const cookie = require('cookie') +const jsonwebtoken = require('jsonwebtoken'); +const config = require('./../../config') +const httpsServer = require('https'); +const httpServer = require('http'); +let controls = require('./controls') + +const authenticationController = require('./../../controllers/authentication'); +const deviceController = require('./../../controllers/devices'); + + +const log4js = require('log4js'); + +const logger = log4js.getLogger('default'); + +const athenaRealtime = require('./../athena/index'); + +const realtimeCommands = require('./commands') + + +function __server() { + server = httpServer.createServer(); + + + wss = new WebSocket.WebSocketServer({ server }, { path: '/realtime/v1', handshakeTimeout: 500 }); + + + server.listen(81, () => { + logger.info(`Web(Server) - UP @ ${config.athena.host}:${config.athena.port}`) + }) + + + wss.on('connection', manageConnection) + wss.on('close', function close() { + logger.info(`Web(Websocket) - DOWN`) + clearInterval(interval); + }); + return wss; +} + +function buildResponse(ws, success, msg, data) { + ws.send(JSON.stringify({success: success, msg: msg, data: data, timestamp: Date.now()})) +} + + +async function authenticateUser(ws, res) { + console.log("headers:", res.headers) + + + let account; + + //if (res.headers.Authorization) { + // account = await authenticationController.getAccountFromJWT(res.headers.Authorization) + // } else { + const cookies = cookie.parse(res.headers.cookie); + account = await authenticationController.getAccountFromJWT(cookies.jwt); + // } + + console.log("THE ACCOUNT FOUND:", account) + if (account) { + ws.account = account; + return true; + } + + ws.terminate() + return false; +} + + +async function manageConnection(ws, res) { + logger.info(`Web(Websocket) - New Connection ${ws._socket.remoteAddress}`) + + await authenticateUser(ws, res); + + console.log(ws.account); + + ws.on('message', async function incoming(message) { + console.log(message) + const msg = JSON.parse(message.toString('utf8')); + + switch (msg.command) { + case "is_dongle_online": + return realtimeCommands.isDongleOnline(ws, msg); + case "reboot_dongle": + return realtimeCommands.rebootDongle(ws, msg); + case "take_snapshot": + return realtimeCommands.takeSnapshot(ws, msg); + default: + return ws.send(JSON.stringify({error: true, id: msg.id || null, msg: 'VERIFY_DATA', data: {msg}})) + } + + }); + + return wss; + + //ws.send(JSON.stringify(await commandBuilder('reboot'))) +} + + +const websocketServer = __server(); + + + +controls = controls(websocketServer); + + +athenaRealtime.realtimeCallback(controls); + + + +module.exports = { + controls, + websocketServer +}; \ No newline at end of file