const config = require('./config'); const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const log4js = require('log4js'); import sqlite3 from 'sqlite3' import { open } from 'sqlite' const lockfile = require('proper-lockfile'); var http = require('http'); var https = require('https'); const express = require('express'); const cors = require('cors'); const bodyParser = require('body-parser'); const cookieParser=require('cookie-parser'); const jwt = require('jsonwebtoken'); const sendmail = require('sendmail')(); const htmlspecialchars = require('htmlspecialchars'); const dirTree = require("directory-tree"); const { resolve } = require('path'); const execSync = require('child_process').execSync; const Reader = require('@commaai/log_reader'); var ffprobe = require('ffprobe'), ffprobeStatic = require('ffprobe-static'); const { exception } = require('console'); var db = null; var lastCleaningTime=0; var startTime=Date.now(); log4js.configure({ appenders: { logfile: { type: "file", filename: "worker.log" }, out: { type: "console"} }, categories: { default: { appenders: ['out', 'logfile'], level: 'info' } } }); var logger = log4js.getLogger('default'); function initializeStorage() { var verifiedPath = mkDirByPathSync(config.storagePath, {isRelativeToScript: (config.storagePath.indexOf("/")===0 ? false : true)}); if (verifiedPath!=null) logger.info("Verified storage path "+verifiedPath); else { logger.error("Unable to verify storage path '"+config.storagePath+"', check filesystem / permissions"); process.exit(); } } function validateJWTToken(token, publicKey) { try { var decoded = jwt.verify(token.replace("JWT ", ""), publicKey, { algorithms: ['RS256'] }); return decoded; } catch (exception) { console.log(exception); } return null; } function formatDate(timestampMs) { return new Date(timestampMs).toISOString().replace(/T/, ' ').replace(/\..+/, ''); } function formatDuration(durationSeconds) { var secs = durationSeconds % 60; var mins = Math.floor(durationSeconds / 60); var hours = Math.floor(mins / 60); mins = mins % 60; var response=''; if (hours>0) response+=hours+'h '; if (hours>0 || mins>0) response+=mins+'m '; response+=secs+'s'; return response; } function mkDirByPathSync(targetDir, { isRelativeToScript = false } = {}) { const sep = path.sep; const initDir = path.isAbsolute(targetDir) ? sep : ''; const baseDir = isRelativeToScript ? __dirname : '.'; return targetDir.split(sep).reduce((parentDir, childDir) => { const curDir = path.resolve(baseDir, parentDir, childDir); try { fs.mkdirSync(curDir); } catch (err) { if (err.code === 'EEXIST') { // curDir already exists! return curDir; } // To avoid `EISDIR` error on Mac and `EACCES`-->`ENOENT` and `EPERM` on Windows. if (err.code === 'ENOENT') { // Throw the original parentDir error on curDir `ENOENT` failure. logger.error(`EACCES: permission denied, mkdir '${parentDir}'`); return null; } const caughtErr = ['EACCES', 'EPERM', 'EISDIR'].indexOf(err.code) > -1; if (!caughtErr || (caughtErr && curDir === path.resolve(targetDir))) { logger.error("'EACCES', 'EPERM', 'EISDIR' during mkdir"); return null; } } return curDir; }, initDir); } function simpleStringify (object){ var simpleObject = {}; for (var prop in object ){ if (!object.hasOwnProperty(prop)){ continue; } if (typeof(object[prop]) == 'object'){ continue; } if (typeof(object[prop]) == 'function'){ continue; } simpleObject[prop] = object[prop]; } return JSON.stringify(simpleObject); // returns cleaned up JSON }; function writeFileSync(path, buffer, permission) { var fileDescriptor; try { fileDescriptor = fs.openSync(path, 'w', permission); } catch (e) { fs.chmodSync(path, permission); fileDescriptor = fs.openSync(path, 'w', permission); } if (fileDescriptor) { fs.writeSync(fileDescriptor, buffer, 0, buffer.length, 0); fs.closeSync(fileDescriptor); logger.info("writeFileSync wiriting to '"+path+"' successful"); return true; } logger.error("writeFileSync writing to '"+path+"' failed"); return false; } function moveUploadedFile(buffer, directory, filename) { logger.info("moveUploadedFile called with '"+filename+"' -> '"+directory+"'"); if (directory.indexOf("..")>=0 || filename.indexOf("..")>=0) { logger.error("moveUploadedFile failed, .. in directory or filename"); return false; } if (config.storagePath.lastIndexOf("/")!==config.storagePath.length-1) directory='/'+directory; if (directory.lastIndexOf("/")!==directory.length-1) directory=directory+'/'; var finalPath = mkDirByPathSync(config.storagePath+directory, {isRelativeToScript: (config.storagePath.indexOf("/")===0 ? false : true)}); if (finalPath && finalPath.length>0) { if (writeFileSync(finalPath+"/"+filename, buffer, 0o660)) { logger.info("moveUploadedFile successfully written '"+(finalPath+"/"+filename)+"'"); return finalPath+"/"+filename; } logger.error("moveUploadedFile failed to writeFileSync"); return false; } logger.error("moveUploadedFile invalid final path, check permissions to create / write '"+(config.storagePath+directory)+"'"); return false; }; function deleteFolderRecursive(directoryPath) { if (fs.existsSync(directoryPath)) { fs.readdirSync(directoryPath).forEach((file, index) => { const curPath = path.join(directoryPath, file); if (fs.lstatSync(curPath).isDirectory()) { deleteFolderRecursive(curPath); } else { fs.unlinkSync(curPath); } }); fs.rmdirSync(directoryPath); } }; var segmentProcessQueue=[]; var segmentProcessPosition=0; var affectedDrives={}; var affectedDevices={}; var rlog_lastTsInternal=0; var rlog_prevLatInternal=-1000; var rlog_prevLngInternal=-1000; var rlog_totalDistInternal = 0; var rlog_lastTsExternal=0; var rlog_prevLatExternal=-1000; var rlog_prevLngExternal=-1000; var rlog_totalDistExternal = 0; var qcamera_duration = 0; function processSegmentRLog(rLogPath) { rlog_lastTsInternal=0; rlog_prevLatInternal=-1000; rlog_prevLngInternal=-1000; rlog_totalDistInternal = 0; rlog_lastTsExternal=0; rlog_prevLatExternal=-1000; rlog_prevLngExternal=-1000; rlog_totalDistExternal = 0; return new Promise( function(resolve, reject) { var readStream = fs.createReadStream(rLogPath); var reader = Reader(readStream); readStream.on('close', function () { resolve(); }); reader(function (obj) { try { if (obj['LogMonoTime']!==undefined && obj['LogMonoTime']-rlog_lastTsInternal>=1000000*1000*1 && obj['GpsLocation']!==undefined) { logger.info('processSegmentRLog GpsLocation @ '+obj['LogMonoTime']+': '+obj['GpsLocation']['Latitude']+' '+obj['GpsLocation']['Longitude']); if (rlog_prevLatInternal!=-1000) { var lat1=rlog_prevLatInternal; var lat2=obj['GpsLocation']['Latitude']; var lon1=rlog_prevLngInternal; var lon2=obj['GpsLocation']['Longitude']; var p = 0.017453292519943295; // Math.PI / 180 var c = Math.cos; var a = 0.5 - c((lat2 - lat1) * p)/2 + c(lat1 * p) * c(lat2 * p) * (1 - c((lon2 - lon1) * p))/2; var dist_m = 1000 * 12742 * Math.asin(Math.sqrt(a)); // 2 * R; R = 6371 km rlog_totalDistInternal+=dist_m; } rlog_prevLatInternal=obj['GpsLocation']['Latitude']; rlog_prevLngInternal=obj['GpsLocation']['Longitude']; rlog_lastTsInternal = obj['LogMonoTime']; } else if (obj['LogMonoTime']!==undefined && obj['LogMonoTime']-rlog_lastTsExternal>=1000000*1000*1 && obj['GpsLocationExternal']!==undefined) { logger.info('processSegmentRLog GpsLocationExternal @ '+obj['LogMonoTime']+': '+obj['GpsLocationExternal']['Latitude']+' '+obj['GpsLocationExternal']['Longitude']); if (rlog_prevLatExternal!=-1000) { var lat1=rlog_prevLatExternal; var lat2=obj['GpsLocationExternal']['Latitude']; var lon1=rlog_prevLngExternal; var lon2=obj['GpsLocationExternal']['Longitude']; var p = 0.017453292519943295; // Math.PI / 180 var c = Math.cos; var a = 0.5 - c((lat2 - lat1) * p)/2 + c(lat1 * p) * c(lat2 * p) * (1 - c((lon2 - lon1) * p))/2; var dist_m = 1000 * 12742 * Math.asin(Math.sqrt(a)); // 2 * R; R = 6371 km rlog_totalDistExternal+=dist_m; } rlog_prevLatExternal=obj['GpsLocationExternal']['Latitude']; rlog_prevLngExternal=obj['GpsLocationExternal']['Longitude']; rlog_lastTsExternal = obj['LogMonoTime']; } } catch(exception) { } }); } ); } function processSegmentVideo(qcameraPath) { qcamera_duration=0; return new Promise(function(resolve, reject) { ffprobe(qcameraPath, { path: ffprobeStatic.path }) .then(function (info) { if (info['streams']!==undefined && info['streams'][0]!==undefined && info['streams'][0]['duration']!==undefined) qcamera_duration = info['streams'][0]['duration']; logger.info('processSegmentVideo duration: '+qcamera_duration+'s'); resolve(); }) .catch(function (err) { console.error(err); logger.error('processSegmentVideo error: '+err); resolve(); }); }); } function processSegmentsRecursive() { if (segmentProcessQueue.length<=segmentProcessPosition) return updateDrives(); var segmentWrapper = segmentProcessQueue[segmentProcessPosition]; const segment = segmentWrapper.segment; const uploadComplete = segmentWrapper.uploadComplete; const driveIdentifier = segmentWrapper.driveIdentifier; const fileStatus = segmentWrapper.fileStatus; logger.info('processSegmentsRecursive '+segment.dongle_id+' '+segment.drive_identifier+' '+segment.segment_id); var p1 = processSegmentRLog(fileStatus['rlog.bz2']); var p2 = processSegmentVideo(fileStatus['qcamera.ts']); Promise.all([p1, p2]).then((values) => { (async () => { logger.info('processSegmentsRecursive '+segment.dongle_id+' '+segment.drive_identifier+' '+segment.segment_id+' internal gps: '+(Math.round(rlog_totalDistInternal*100)/100)+'m, external gps: '+(Math.round(rlog_totalDistExternal*100)/100)+'m, duration: '+qcamera_duration+'s'); const driveSegmentResult = await db.run( 'UPDATE drive_segments SET duration = ?, distance_meters = ?, is_processed = ?, upload_complete = ?, is_stalled = ? WHERE id = ?', qcamera_duration, Math.round(Math.max(rlog_totalDistInternal, rlog_totalDistExternal)*10)/10, true, uploadComplete, false, segment.id ); affectedDrives[driveIdentifier]=true; segmentProcessPosition++; setTimeout(function() {processSegmentsRecursive();}, 0); })(); }).catch((error) => { logger.error(error); }); } async function updateSegments() { segmentProcessQueue=[]; segmentProcessPosition=0; affectedDrives={}; const drive_segments = await db.all('SELECT * FROM drive_segments WHERE upload_complete = ? AND is_stalled = ? ORDER BY created ASC', false, false); for (var t=0; t10*24*3600*1000) { // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled) logger.info('updateSegments isStalled for '+segment.dongle_id+' '+segment.drive_identifier+' '+segment.segment_id); const driveSegmentResult = await db.run( 'UPDATE drive_segments SET is_stalled = ? WHERE id = ?', true, segment.id); } if (segmentProcessQueue.length>=50) // we process at most 50 segments per batch break; } if (segmentProcessQueue.length>0) processSegmentsRecursive(); else // if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload updateDrives(); } async function updateDevices() { // go through all affected devices (with deleted or updated drives) and update them (storage_used) logger.info("updateDevices - affected drives: "+JSON.stringify(affectedDevices)); for (const [key, value] of Object.entries(affectedDevices)) { var dongleId = key; const device = await db.get('SELECT * FROM devices WHERE dongle_id = ?', dongleId); if (device==null) continue; var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(device.dongle_id).digest('hex'); var devicePath=config.storagePath+device.dongle_id+"/"+dongleIdHash; var deviceQuotaMb = Math.round(parseInt(execSync("du -s "+devicePath+" | awk -F'\t' '{print $1;}'").toString())/1024); logger.info("updateDevices device "+dongleId+" has an updated storage_used of: "+deviceQuotaMb+" MB"); const deviceResult = await db.run( 'UPDATE devices SET storage_used = ? WHERE dongle_id = ?', deviceQuotaMb, device.dongle_id); } affectedDevices=[]; } async function updateDrives() { // go through all affected drives and update them / complete and/or build m3u8 logger.info("updateDrives - affected drives: "+JSON.stringify(affectedDrives)); for (const [key, value] of Object.entries(affectedDrives)) { var dongleId, driveIdentifier; [dongleId, driveIdentifier] = key.split('|'); const drive = await db.get('SELECT * FROM drives WHERE identifier = ? AND dongle_id = ?', driveIdentifier, dongleId); if (drive==null) continue; var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(drive.dongle_id).digest('hex'); var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt).update(drive.identifier).digest('hex'); var driveUrl=config.baseDriveDownloadUrl+drive.dongle_id+"/"+dongleIdHash+"/"+driveIdentifierHash+"/"+drive.identifier; var drivePath=config.storagePath+drive.dongle_id+"/"+dongleIdHash+"/"+driveIdentifierHash+"/"+drive.identifier; var uploadComplete=true; var isProcessed=true; var totalDistanceMeters=0; var totalDurationSeconds=0; var playlistSegmentStrings=''; const drive_segments = await db.all('SELECT * FROM drive_segments WHERE drive_identifier = ? AND dongle_id = ? ORDER BY segment_id ASC', driveIdentifier, dongleId); for (var t=0; t ?', config.deviceStorageQuotaMb); for (var t=0; t (a.date < b.date) ? 1 : -1); for (var c=5; c (a.date < b.date) ? 1 : -1); for (var c=5; c60*3600*1000) { logger.info("EXIT WORKER AFTER 1 HOUR TO PREVENT MEMORY LEAKS..."); process.exit(); } try { if (Date.now()-lastCleaningTime>20*3600*1000) { deleteBootAndCrashLogs(); deleteExpiredDrives(); deleteOverQuotaDrives(); removeDeletedDrivesPhysically(); lastCleaningTime=Date.now(); } setTimeout(function() {updateSegments();}, 5000); } catch (e) { logger.error(e); } } lockfile.lock('retropilot_worker.lock', { realpath: false, stale: 90000, update: 2000 }) .then((release) => { logger.info("STARTING WORKER..."); (async () => { try { db = await open({ filename: config.databaseFile, driver: sqlite3.Database, mode: sqlite3.OPEN_READWRITE }); await db.get('SELECT * FROM accounts LIMIT 1') await db.get('SELECT * FROM devices LIMIT 1') await db.get('SELECT * FROM drives LIMIT 1') await db.get('SELECT * FROM drive_segments LIMIT 1') } catch(exception) { logger.error(exception); process.exit(); } initializeStorage(); setTimeout(function() {mainWorkerLoop();}, 0); })(); }).catch((e) => { console.error(e) process.exit(); });