fix worker.js indentation

pull/4/head
Cameron Clough 2022-01-08 22:19:08 +00:00
parent 3323b19d85
commit 450c05501e
2 changed files with 685 additions and 714 deletions

View File

@ -225,7 +225,7 @@ async function getBootlogs(dongle_id) {
date: dateObj,
permalink: `${config.baseDriveDownloadUrl}${dongle_id}/${dongleIdHash}/boot/${file.name}`,
};
})
});
bootlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
return bootlogFiles;
}

239
worker.js
View File

@ -10,25 +10,14 @@ const { open } = require('sqlite');
const lockfile = require('proper-lockfile');
const http = require('http');
const https = require('https');
const express = require('express');
const cors = require('cors');
const bodyParser = require('body-parser');
const cookieParser = require('cookie-parser');
const jwt = require('jsonwebtoken');
const htmlspecialchars = require('htmlspecialchars');
const dirTree = require('directory-tree');
const { resolve } = require('path');
const { execSync } = require('child_process');
const Reader = require('@commaai/log_reader');
var ffprobe = require('ffprobe'),
ffprobeStatic = require('ffprobe-static');
const { exception } = require('console');
var db = null;
@ -53,45 +42,17 @@ function initializeStorage() {
}
}
function validateJWTToken(token, publicKey) {
try {
var decoded = jwt.verify(token.replace('JWT ', ''), publicKey, { algorithms: ['RS256'] });
return decoded;
}
catch (exception) {
console.log(exception);
}
return null;
}
function formatDate(timestampMs) {
return new Date(timestampMs).toISOString().replace(/T/, ' ').replace(/\..+/, '');
}
function formatDuration(durationSeconds) {
var secs = durationSeconds % 60;
var mins = Math.floor(durationSeconds / 60);
var hours = Math.floor(mins / 60);
mins %= 60;
var response = '';
if (hours > 0) response += `${hours}h `;
if (hours > 0 || mins > 0) response += `${mins}m `;
response += `${secs}s`;
return response;
}
function mkDirByPathSync(targetDir, { isRelativeToScript = false } = {}) {
const { sep } = path;
const initDir = path.isAbsolute(targetDir) ? sep : '';
const baseDir = isRelativeToScript ? __dirname : '.';
return targetDir.split(sep).reduce((parentDir, childDir) => {
return targetDir.split(sep)
.reduce((parentDir, childDir) => {
const curDir = path.resolve(baseDir, parentDir, childDir);
try {
fs.mkdirSync(curDir);
}
catch (err) {
} catch (err) {
if (err.code === 'EEXIST') { // curDir already exists!
return curDir;
}
@ -104,7 +65,7 @@ function mkDirByPathSync(targetDir, { isRelativeToScript = false } = {}) {
const caughtErr = ['EACCES', 'EPERM', 'EISDIR'].indexOf(err.code) > -1;
if (!caughtErr || (caughtErr && curDir === path.resolve(targetDir))) {
logger.error("'EACCES', 'EPERM', 'EISDIR' during mkdir");
logger.error('\'EACCES\', \'EPERM\', \'EISDIR\' during mkdir');
return null;
}
}
@ -113,29 +74,11 @@ function mkDirByPathSync(targetDir, { isRelativeToScript = false } = {}) {
}, initDir);
}
function simpleStringify(object) {
var simpleObject = {};
for (var prop in object) {
if (!object.hasOwnProperty(prop)) {
continue;
}
if (typeof (object[prop]) === 'object') {
continue;
}
if (typeof (object[prop]) === 'function') {
continue;
}
simpleObject[prop] = object[prop];
}
return JSON.stringify(simpleObject); // returns cleaned up JSON
}
function writeFileSync(path, buffer, permission) {
var fileDescriptor;
try {
fileDescriptor = fs.openSync(path, 'w', permission);
}
catch (e) {
} catch (e) {
fs.chmodSync(path, permission);
fileDescriptor = fs.openSync(path, 'w', permission);
}
@ -150,6 +93,7 @@ function writeFileSync(path, buffer, permission) {
return false;
}
// eslint-disable-next-line
function moveUploadedFile(buffer, directory, filename) {
logger.info(`moveUploadedFile called with '${filename}' -> '${directory}'`);
@ -178,9 +122,11 @@ function moveUploadedFile(buffer, directory, filename) {
function deleteFolderRecursive(directoryPath) {
if (fs.existsSync(directoryPath)) {
fs.readdirSync(directoryPath).forEach((file, index) => {
fs.readdirSync(directoryPath)
.forEach((file, index) => {
const curPath = path.join(directoryPath, file);
if (fs.lstatSync(curPath).isDirectory()) {
if (fs.lstatSync(curPath)
.isDirectory()) {
deleteFolderRecursive(curPath);
}
else {
@ -195,10 +141,8 @@ async function dbProtectedRun() {
let retries = 0;
while (true) {
try {
const result = await db.run(...arguments);
return result;
}
catch (error) {
return await db.run(...arguments);
} catch (error) {
logger.error(error);
retries++;
if (retries >= 10) {
@ -207,7 +151,6 @@ async function dbProtectedRun() {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
break;
}
logger.error(`unable to complete dbProtectedRun for ${arguments}`);
return null;
@ -217,10 +160,8 @@ async function dbProtectedGet() {
let retries = 0;
while (true) {
try {
const result = await db.get(...arguments);
return result;
}
catch (error) {
return await db.get(...arguments);
} catch (error) {
logger.error(error);
retries++;
if (retries >= 10) {
@ -229,7 +170,6 @@ async function dbProtectedGet() {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
break;
}
logger.error(`unable to complete dbProtectedGet for ${arguments}`);
return null;
@ -239,10 +179,8 @@ async function dbProtectedAll() {
let retries = 0;
while (true) {
try {
const result = await db.all(...arguments);
return result;
}
catch (error) {
return await db.all(...arguments);
} catch (error) {
logger.error(error);
retries++;
if (retries >= 10) {
@ -251,7 +189,6 @@ async function dbProtectedAll() {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
break;
}
logger.error(`unable to complete dbProtectedGet for ${arguments}`);
return null;
@ -294,13 +231,11 @@ function processSegmentRLog(rLogPath) {
try {
execSync(`bunzip2 -k -f "${rLogPath}"`);
}
catch (exception) { // if bunzip2 fails, something was wrong with the file (corrupt / missing)
} catch (exception) { // if bunzip2 fails, something was wrong with the file (corrupt / missing)
logger.error(exception);
try {
fs.unlinkSync(temporaryFile);
}
catch (exception) { }
} catch (exception) { }
resolve();
return;
}
@ -311,16 +246,14 @@ function processSegmentRLog(rLogPath) {
readStream = fs.createReadStream(temporaryFile);
reader = Reader(readStream);
} catch (err) {
logger.error("314 - logger", err)
logger.error('314 - logger', err);
}
readStream.on('close', () => {
logger.info('processSegmentRLog readStream close event triggered, resolving promise');
try {
fs.unlinkSync(temporaryFile);
}
catch (exception) { }
} catch (exception) { }
resolve();
});
try {
@ -376,13 +309,12 @@ function processSegmentRLog(rLogPath) {
else if (obj.LogMonoTime !== undefined && obj.InitData !== undefined && rlog_InitData == null) {
rlog_InitData = obj.InitData;
}
}
catch (exception) {
} catch (exception) {
}
});
} catch (readerERr) {
throw new Error("reader Err 385", readerEEr)
throw new Error('reader Err 385', readerEEr);
}
}
);
@ -431,11 +363,13 @@ function processSegmentsRecursive() {
if (segment.process_attempts > 5) {
logger.error(`FAILING TO PROCESS SEGMENT,${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} JSON: ${JSON.stringify(segment)} SKIPPING `);
segmentProcessPosition++;
} else {
}
else {
var p1 = processSegmentRLog(fileStatus['rlog.bz2']);
var p2 = processSegmentVideo(fileStatus['qcamera.ts']);
Promise.all([p1, p2]).then((values) => {
Promise.all([p1, p2])
.then((values) => {
(async () => {
logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} internal gps: ${Math.round(rlog_totalDistInternal * 100) / 100}m, external gps: ${Math.round(rlog_totalDistExternal * 100) / 100}m, duration: ${qcamera_duration}s`);
@ -466,16 +400,13 @@ function processSegmentsRecursive() {
affectedDriveInitData[driveIdentifier] = rlog_InitData;
}
segmentProcessPosition++;
setTimeout(() => {
processSegmentsRecursive();
}, 0);
})();
}).catch((error) => {
})
.catch((error) => {
logger.error(error);
});
}
@ -494,8 +425,12 @@ async function updateSegments() {
for (var t = 0; t < drive_segments.length; t++) {
var segment = drive_segments[t];
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(segment.dongle_id).digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt).update(segment.drive_identifier).digest('hex');
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt)
.update(segment.dongle_id)
.digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt)
.update(segment.drive_identifier)
.digest('hex');
const directoryTree = dirTree(`${config.storagePath + segment.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${segment.drive_identifier}/${segment.segment_id}`);
if (directoryTree == null || directoryTree.children == undefined) continue; // happens if upload in progress (db entity written but directory not yet created)
@ -506,7 +441,11 @@ async function updateSegments() {
var qlog = false;
var rlog = false;
var fileStatus = {
'fcamera.hevc': false, 'dcamera.hevc': false, 'qcamera.ts': false, 'qlog.bz2': false, 'rlog.bz2': false
'fcamera.hevc': false,
'dcamera.hevc': false,
'qcamera.ts': false,
'qlog.bz2': false,
'rlog.bz2': false
};
for (var i in directoryTree.children) {
@ -521,7 +460,10 @@ async function updateSegments() {
if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) { // can process
segmentProcessQueue.push({
segment, fileStatus, uploadComplete, driveIdentifier: `${segment.dongle_id}|${segment.drive_identifier}`
segment,
fileStatus,
uploadComplete,
driveIdentifier: `${segment.dongle_id}|${segment.drive_identifier}`
});
}
else if (uploadComplete) {
@ -574,9 +516,12 @@ async function updateDevices() {
const device = await dbProtectedGet('SELECT * FROM devices WHERE dongle_id = ?', dongleId);
if (device == null) continue;
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(device.dongle_id).digest('hex');
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt)
.update(device.dongle_id)
.digest('hex');
var devicePath = `${config.storagePath + device.dongle_id}/${dongleIdHash}`;
var deviceQuotaMb = Math.round(parseInt(execSync(`du -s ${devicePath} | awk -F'\t' '{print $1;}'`).toString()) / 1024);
var deviceQuotaMb = Math.round(parseInt(execSync(`du -s ${devicePath} | awk -F'\t' '{print $1;}'`)
.toString()) / 1024);
logger.info(`updateDevices device ${dongleId} has an updated storage_used of: ${deviceQuotaMb} MB`);
const deviceResult = await dbProtectedRun(
@ -599,8 +544,12 @@ async function updateDrives() {
const drive = await dbProtectedGet('SELECT * FROM drives WHERE identifier = ? AND dongle_id = ?', driveIdentifier, dongleId);
if (drive == null) continue;
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(drive.dongle_id).digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt).update(drive.identifier).digest('hex');
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt)
.update(drive.dongle_id)
.digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt)
.update(drive.identifier)
.digest('hex');
var driveUrl = `${config.baseDriveDownloadUrl + drive.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${drive.identifier}`;
var drivePath = `${config.storagePath + drive.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${drive.identifier}`;
@ -615,7 +564,9 @@ async function updateDrives() {
if (drive_segments != null) {
for (var t = 0; t < drive_segments.length; t++) {
if (!drive_segments[t].upload_complete) uploadComplete = false;
if (!drive_segments[t].is_processed) isProcessed = false;
if (!drive_segments[t].is_processed) {
isProcessed = false;
}
else {
totalDistanceMeters += parseFloat(drive_segments[t].distance_meters);
totalDurationSeconds += parseFloat(drive_segments[t].duration);
@ -628,18 +579,21 @@ async function updateDrives() {
var { filesize } = drive;
if (uploadComplete) {
try {
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(dongleId).digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt).update(driveIdentifier).digest('hex');
filesize = parseInt(execSync(`du -s ${drivePath} | awk -F'\t' '{print $1;}'`).toString()); // in kilobytes
}
catch (exception) { }
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt)
.update(dongleId)
.digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt)
.update(driveIdentifier)
.digest('hex');
filesize = parseInt(execSync(`du -s ${drivePath} | awk -F'\t' '{print $1;}'`)
.toString()); // in kilobytes
} catch (exception) { }
}
let metadata = {};
try {
metadata = JSON.parse(drive.metadata);
}
catch (exception) {
} catch (exception) {
logger.error(exception);
}
if (metadata == null) metadata = {};
@ -717,8 +671,12 @@ async function removeDeletedDrivesPhysically() {
for (var t = 0; t < deletedDrives.length; t++) {
logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} is deleted, remove physical files and clean database`);
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(deletedDrives[t].dongle_id).digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt).update(deletedDrives[t].identifier).digest('hex');
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt)
.update(deletedDrives[t].dongle_id)
.digest('hex');
var driveIdentifierHash = crypto.createHmac('sha256', config.applicationSalt)
.update(deletedDrives[t].identifier)
.digest('hex');
const drivePath = `${config.storagePath + deletedDrives[t].dongle_id}/${dongleIdHash}/${driveIdentifierHash}`;
logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} storage path is ${drivePath}`);
@ -739,8 +697,7 @@ async function removeDeletedDrivesPhysically() {
if (driveResult != null && driveSegmentResult != null) deleteFolderRecursive(drivePath, { recursive: true });
affectedDevices[deletedDrives[t].dongle_id] = true;
}
catch (exception) {
} catch (exception) {
logger.error(exception);
}
}
@ -791,16 +748,24 @@ async function deleteBootAndCrashLogs() {
for (var t = 0; t < devices.length; t++) {
var device = devices[t];
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt).update(device.dongle_id).digest('hex');
var dongleIdHash = crypto.createHmac('sha256', config.applicationSalt)
.update(device.dongle_id)
.digest('hex');
const bootlogDirectoryTree = dirTree(`${config.storagePath + device.dongle_id}/${dongleIdHash}/boot/`, { attributes: ['size'] });
var bootlogFiles = [];
if (bootlogDirectoryTree != undefined) {
for (var i = 0; i < bootlogDirectoryTree.children.length; i++) {
var timeSplit = bootlogDirectoryTree.children[i].name.replace('boot-', '').replace('crash-', '').replace('\.bz2', '').split('--');
var timeSplit = bootlogDirectoryTree.children[i].name.replace('boot-', '')
.replace('crash-', '')
.replace('\.bz2', '')
.split('--');
var timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`;
bootlogFiles.push({
name: bootlogDirectoryTree.children[i].name, size: bootlogDirectoryTree.children[i].size, date: Date.parse(timeString), path: bootlogDirectoryTree.children[i].path
name: bootlogDirectoryTree.children[i].name,
size: bootlogDirectoryTree.children[i].size,
date: Date.parse(timeString),
path: bootlogDirectoryTree.children[i].path
});
}
bootlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
@ -809,8 +774,7 @@ async function deleteBootAndCrashLogs() {
try {
fs.unlinkSync(bootlogFiles[c].path);
affectedDevices[device.dongle_id] = true;
}
catch (exception) {
} catch (exception) {
logger.error(exception);
}
}
@ -820,10 +784,16 @@ async function deleteBootAndCrashLogs() {
var crashlogFiles = [];
if (crashlogDirectoryTree != undefined) {
for (var i = 0; i < crashlogDirectoryTree.children.length; i++) {
var timeSplit = crashlogDirectoryTree.children[i].name.replace('boot-', '').replace('crash-', '').replace('\.bz2', '').split('--');
var timeSplit = crashlogDirectoryTree.children[i].name.replace('boot-', '')
.replace('crash-', '')
.replace('\.bz2', '')
.split('--');
var timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`;
crashlogFiles.push({
name: crashlogDirectoryTree.children[i].name, size: crashlogDirectoryTree.children[i].size, date: Date.parse(timeString), path: crashlogDirectoryTree.children[i].path
name: crashlogDirectoryTree.children[i].name,
size: crashlogDirectoryTree.children[i].size,
date: Date.parse(timeString),
path: crashlogDirectoryTree.children[i].path
});
}
crashlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
@ -832,8 +802,7 @@ async function deleteBootAndCrashLogs() {
try {
fs.unlinkSync(crashlogFiles[c].path);
affectedDevices[device.dongle_id] = true;
}
catch (exception) {
} catch (exception) {
logger.error(exception);
}
}
@ -859,8 +828,7 @@ async function mainWorkerLoop() {
setTimeout(() => {
updateSegments();
}, 5000);
}
catch (e) {
} catch (e) {
logger.error(e);
}
}
@ -868,13 +836,16 @@ async function mainWorkerLoop() {
// make sure bunzip2 is available
try {
//execSync('bunzip2 --help');
}
catch (exception) {
} catch (exception) {
logger.error('bunzip2 is not installed or not available in environment path');
process.exit();
}
lockfile.lock('retropilot_worker.lock', { realpath: false, stale: 90000, update: 2000 })
lockfile.lock('retropilot_worker.lock', {
realpath: false,
stale: 90000,
update: 2000
})
.then((release) => {
logger.info('STARTING WORKER...');
(async () => {
@ -888,8 +859,7 @@ lockfile.lock('retropilot_worker.lock', { realpath: false, stale: 90000, update:
await db.get('SELECT * FROM devices LIMIT 1');
await db.get('SELECT * FROM drives LIMIT 1');
await db.get('SELECT * FROM drive_segments LIMIT 1');
}
catch (exception) {
} catch (exception) {
logger.error(exception);
process.exit();
}
@ -899,7 +869,8 @@ lockfile.lock('retropilot_worker.lock', { realpath: false, stale: 90000, update:
mainWorkerLoop();
}, 0);
})();
}).catch((e) => {
})
.catch((e) => {
console.error(e);
process.exit();
});