From 930665f3c6dba0a023bfee7c63574000a8f19607 Mon Sep 17 00:00:00 2001 From: Cameron Clough Date: Tue, 22 Mar 2022 22:34:28 +0000 Subject: [PATCH] refactor(worker): extract cleanup functions --- src/worker/cleanup.js | 170 +++++++++++++++++++++++++ src/worker/worker.js | 282 +++++++++++------------------------------- 2 files changed, 241 insertions(+), 211 deletions(-) create mode 100644 src/worker/cleanup.js diff --git a/src/worker/cleanup.js b/src/worker/cleanup.js new file mode 100644 index 0000000..6bc7e76 --- /dev/null +++ b/src/worker/cleanup.js @@ -0,0 +1,170 @@ +import crypto from 'crypto'; +import dirTree from 'directory-tree'; +import fs from 'fs'; +import log4js from 'log4js'; + +import { Drives } from '../models'; +import orm from '../models/orm'; +import { deleteFolderRecursive } from './storage'; + +const logger = log4js.getLogger('cleanup'); + +export let affectedDevices = {}; + +async function deleteBootAndCrashLogs() { + const [devices] = await orm.query('SELECT * FROM devices'); + if (devices == null) { + return; + } + + for (let t = 0; t < devices.length; t++) { + const device = devices[t]; + const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) + .update(device.dongle_id) + .digest('hex'); + + const bootlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/boot/`, { attributes: ['size'] }); + const bootlogFiles = []; + if (bootlogDirectoryTree) { + for (let i = 0; i < bootlogDirectoryTree.children.length; i++) { + const timeSplit = bootlogDirectoryTree.children[i].name.replace('boot-', '') + .replace('crash-', '') + .replace('.bz2', '') + .split('--'); + const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`; + bootlogFiles.push({ + name: bootlogDirectoryTree.children[i].name, + size: bootlogDirectoryTree.children[i].size, + date: Date.parse(timeString), + path: bootlogDirectoryTree.children[i].path, + }); + } + bootlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1)); + for (let c = 5; c < bootlogFiles.length; c++) { + logger.info(`deleteBootAndCrashLogs deleting boot log ${bootlogFiles[c].path}`); + try { + fs.unlinkSync(bootlogFiles[c].path); + affectedDevices[device.dongle_id] = true; + } catch (exception) { + logger.error(exception); + } + } + } + + const crashlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/crash/`, { attributes: ['size'] }); + const crashlogFiles = []; + if (crashlogDirectoryTree) { + for (let i = 0; i < crashlogDirectoryTree.children.length; i++) { + const timeSplit = crashlogDirectoryTree.children[i].name.replace('boot-', '') + .replace('crash-', '') + .replace('.bz2', '') + .split('--'); + const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`; + crashlogFiles.push({ + name: crashlogDirectoryTree.children[i].name, + size: crashlogDirectoryTree.children[i].size, + date: Date.parse(timeString), + path: crashlogDirectoryTree.children[i].path, + }); + } + crashlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1)); + for (let c = 5; c < crashlogFiles.length; c++) { + logger.info(`deleteBootAndCrashLogs deleting crash log ${crashlogFiles[c].path}`); + try { + fs.unlinkSync(crashlogFiles[c].path); + affectedDevices[device.dongle_id] = true; + } catch (exception) { + logger.error(exception); + } + } + } + } +} + +async function deleteExpiredDrives() { + const expirationTs = Date.now() - process.env.DEVICE_EXPIRATION_DAYS * 24 * 3600 * 1000; + + const [expiredDrives] = await orm.query(`SELECT * FROM drives WHERE is_preserved = false AND is_deleted = false AND created < ${expirationTs}`); + if (!expiredDrives) { + return; + } + + for (let t = 0; t < expiredDrives.length; t++) { + logger.info(`deleteExpiredDrives drive ${expiredDrives[t].dongle_id} ${expiredDrives[t].identifier} is older than ${process.env.DEVICE_EXPIRATION_DAYS} days, set is_deleted=true`); + await Drives.update( + { + is_deleted: true, + }, + { where: { id: expiredDrives[t].id } }, + ); + } +} + +async function deleteOverQuotaDrives() { + const [devices] = await orm.query(`SELECT * FROM devices WHERE storage_used > ${process.env.DEVICE_STORAGE_QUOTA_MB}`); + if (devices == null) { + return; + } + + for (let t = 0; t < devices.length; t++) { + let foundDriveToDelete = false; + + const [driveNormal] = await orm.query(`SELECT * FROM drives WHERE dongle_id = ${devices[t].dongle_id} AND is_preserved = false AND is_deleted = false ORDER BY created ASC LIMIT 1`); + if (driveNormal != null) { + logger.info(`deleteOverQuotaDrives drive ${driveNormal.dongle_id} ${driveNormal.identifier} (normal) is deleted for over-quota`); + await orm.query(`UPDATE drives SET is_deleted = true WHERE id = ${driveNormal.id}`); + foundDriveToDelete = true; + } + + if (!foundDriveToDelete) { + const [drivePreserved] = await orm.query('SELECT * FROM drives WHERE dongle_id = devices[t].dongle_id AND is_preserved = true AND is_deleted = false ORDER BY created ASC LIMIT 1'); + if (drivePreserved != null) { + logger.info(`deleteOverQuotaDrives drive ${drivePreserved.dongle_id} ${drivePreserved.identifier} (preserved!) is deleted for over-quota`); + await orm.query(`UPDATE drives SET is_deleted = ? WHERE id = ${drivePreserved.id}`); + foundDriveToDelete = true; + } + } + } +} + +async function removeDeletedDrivesPhysically() { + const [deletedDrives] = await orm.query('SELECT * FROM drives WHERE is_deleted = true AND is_physically_removed = false'); + if (!deletedDrives) { + return; + } + + for (let t = 0; t < deletedDrives.length; t++) { + logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} is deleted, remove physical files and clean database`); + + const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) + .update(deletedDrives[t].dongle_id) + .digest('hex'); + const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) + .update(deletedDrives[t].identifier) + .digest('hex'); + + const drivePath = `${process.env.STORAGE_PATH + deletedDrives[t].dongle_id}/${dongleIdHash}/${driveIdentifierHash}`; + logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} storage path is ${drivePath}`); + try { + const driveResult = await orm.query(`UPDATE drives SET is_physically_removed = true WHERE id = ${deletedDrives[t].id}`); + + const driveSegmentResult = await orm.query( + `DELETE FROM drive_segments WHERE drive_identifier = ${deletedDrives[t].identifier} AND dongle_id = ${deletedDrives[t].dongle_id}`, + ); + + if (driveResult != null && driveSegmentResult != null) { + deleteFolderRecursive(drivePath, { recursive: true }); + } + affectedDevices[deletedDrives[t].dongle_id] = true; + } catch (exception) { + logger.error(exception); + } + } +} + +export async function doCleanup() { + await deleteBootAndCrashLogs(); + await deleteExpiredDrives(); + await deleteOverQuotaDrives(); + await removeDeletedDrivesPhysically(); +} diff --git a/src/worker/worker.js b/src/worker/worker.js index fa53426..2bd2b7a 100644 --- a/src/worker/worker.js +++ b/src/worker/worker.js @@ -1,18 +1,18 @@ import 'dotenv/config'; -import crypto from 'crypto'; -import fs from 'fs'; -import log4js from 'log4js'; -import dirTree from 'directory-tree'; -import { execSync } from 'child_process'; + import Reader from '@commaai/log_reader'; +import { execSync } from 'child_process'; +import crypto from 'crypto'; +import dirTree from 'directory-tree'; import ffprobe from 'ffprobe'; import ffprobeStatic from 'ffprobe-static'; +import fs from 'fs'; +import log4js from 'log4js'; +import { Op } from 'sequelize'; -import orm from '../models/orm'; -import { - initializeStorage, - deleteFolderRecursive, -} from './storage'; +import { Devices, Drives, DriveSegments } from '../models'; +import { affectedDevices, doCleanup } from './cleanup'; +import { initializeStorage } from './storage'; const logger = log4js.getLogger(); @@ -26,8 +26,6 @@ let affectedDrives = {}; let affectedDriveInitData = {}; let affectedDriveCarParams = {}; -let affectedDevices = {}; - let rlogLastTsInternal = 0; let rlogPrevLatInternal = -1000; let rlogPrevLngInternal = -1000; @@ -194,9 +192,11 @@ async function processSegmentsRecursive() { segment.process_attempts += 1; - await orm.query( - `UPDATE drive_segments SET process_attempts = ${segment.process_attempts} WHERE id = ${segment.id}`, - ); + DriveSegments.update({ + process_attempts: segment.process_attempts, + }, { + where: { id: segment.id }, + }); if (segment.process_attempts > 5) { logger.error(`FAILING TO PROCESS SEGMENT,${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} JSON: ${JSON.stringify(segment)} SKIPPING `); @@ -209,7 +209,7 @@ async function processSegmentsRecursive() { .then(async () => { logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} internal gps: ${Math.round(rlogTotalDistInternal * 100) / 100}m, external gps: ${Math.round(rlogTotalDistExternal * 100) / 100}m, duration: ${qcameraDuration}s`); - const driveSegmentResult = await orm.models.drive_segments.update({ + const driveSegmentResult = await DriveSegments.update({ duration: Math.round(qcameraDuration), distance_meters: Math.round( Math.max(rlogTotalDistInternal, rlogTotalDistExternal) * 10, @@ -248,12 +248,21 @@ async function updateSegments() { affectedDriveCarParams = {}; affectedDriveInitData = {}; - const [driveSegments] = await orm.query('SELECT * FROM drive_segments WHERE upload_complete = false AND is_stalled = false AND process_attempts < 5 ORDER BY created ASC'); - logger.info('updateSegments - total drive_segments', driveSegments.length); + const segments = await DriveSegments.findAll({ + where: { + upload_complete: false, + is_stalled: false, + process_attempts: { + [Op.lt]: 5, + }, + }, + order: [['created', 'ASC']], + }); + logger.info('updateSegments - total segments', segments.length); - if (driveSegments) { - for (let t = 0; t < driveSegments.length; t++) { - const segment = driveSegments[t]; + if (segments) { + for (let t = 0; t < segments.length; t++) { + const segment = segments[t]; const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) .update(segment.dongle_id) @@ -286,7 +295,8 @@ async function updateSegments() { const uploadComplete = Object.keys(fileStatus).every((key) => fileStatus[key]); - if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) { // can process + if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) { + // can process segmentProcessQueue.push({ segment, fileStatus, @@ -296,17 +306,19 @@ async function updateSegments() { } else if (uploadComplete) { logger.info(`updateSegments uploadComplete for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`); - await orm.query( - `UPDATE drive_segments SET upload_complete = true, is_stalled = false WHERE id = ${segment.id}`, - ); + await DriveSegments.update({ + upload_complete: true, + is_stalled: false, + }, { where: { id: segment.id } }); affectedDrives[`${segment.dongle_id}|${segment.drive_identifier}`] = true; - } else if (Date.now() - segment.created > 10 * 24 * 3600 * 1000) { // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled) + } else if (Date.now() - segment.created > 10 * 24 * 3600 * 1000) { + // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled) logger.info(`updateSegments isStalled for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`); - await orm.query( - `UPDATE drive_segments SET is_stalled = true WHERE id = ${segment.id}`, - ); + await DriveSegments.update({ + is_stalled: true, + }, { where: { id: segment.id } }); } // we process at most 15 segments per batch @@ -317,19 +329,23 @@ async function updateSegments() { } if (segmentProcessQueue.length > 0) { - processSegmentsRecursive(); + await processSegmentsRecursive(); } else { // if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload - updateDrives(); + await updateDrives(); } } async function updateDevices() { // go through all affected devices (with deleted or updated drives) and update them (storage_used) logger.info(`updateDevices - affected drives: ${JSON.stringify(affectedDevices)}`); - for (const dongleId of Object.keys(affectedDevices)) { - const [device] = await orm.query(`SELECT * FROM devices WHERE dongle_id = ${dongleId}`); - if (device == null) continue; + + await Promise.all(Object.keys(affectedDevices).map(async (dongleId) => { + const device = await Devices.findOne({ where: { dongle_id: dongleId } }); + if (!device) { + logger.warn(`updateDevices - device not found for dongle_id ${dongleId}`); + return; + } const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) .update(device.dongle_id) @@ -339,35 +355,33 @@ async function updateDevices() { .toString(), 10) / 1024); logger.info(`updateDevices device ${dongleId} has an updated storage_used of: ${deviceQuotaMb} MB`); - await orm.models.drives.update( - { - storage_used: deviceQuotaMb, - }, - { - where: { - dongle_id: device.dongle_id, - }, - }, + await Devices.update( + { storage_used: deviceQuotaMb }, + { where: { dongle_id: device.dongle_id } }, ); - } - affectedDevices = []; + delete affectedDevices[dongleId]; + })); } async function updateDrives() { // go through all affected drives and update them / complete and/or build m3u8 logger.info(`updateDrives - affected drives: ${JSON.stringify(affectedDrives)}`); - for (const key of Object.keys(affectedDrives)) { + + await Promise.all(Object.keys(affectedDrives).map(async (key) => { const [dongleId, driveIdentifier] = key.split('|'); - let drive = await orm.models.drives.findOne({ - where: { identifier: driveIdentifier, dongle_id: dongleId }, + const drive = await Drives.findOne({ + where: { + identifier: driveIdentifier, + dongle_id: dongleId, + }, }); if (!drive) { - continue; + logger.warn('updateDrives drive not found', key); + return; } - drive = drive.dataValues; const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) .update(drive.dongle_id) .digest('hex'); @@ -384,14 +398,12 @@ async function updateDrives() { let totalDurationSeconds = 0; let playlistSegmentStrings = ''; - const driveSegments = await orm.models.drive_segments.findAll({ + const driveSegments = await DriveSegments.findAll({ where: { drive_identifier: driveIdentifier, dongle_id: dongleId, }, - order: [ - orm.fn('ASC', orm.col('segment_id')), - ], + order: [['segment_id', 'ASC']], }); if (driveSegments) { @@ -406,6 +418,8 @@ async function updateDrives() { playlistSegmentStrings += `#EXTINF:${driveSegments[t].duration},${driveSegments[t].segment_id}\n${driveUrl}/${driveSegments[t].segment_id}/qcamera.ts\n`; } } + } else { + logger.warn('updateDrives driveSegments not found', key); } let { filesize } = drive; @@ -435,7 +449,7 @@ async function updateDrives() { logger.info(`updateDrives drive ${dongleId} ${driveIdentifier} uploadComplete: ${uploadComplete}`); - await orm.models.drives.update( + await Drives.update( { distance_meters: Math.round(totalDistanceMeters), duration: Math.round(totalDurationSeconds), @@ -460,164 +474,13 @@ async function updateDrives() { fs.writeFileSync(`${drivePath}/qcamera.m3u8`, playlist); } - } + })); await updateDevices(); setTimeout(mainWorkerLoop); } -async function deleteExpiredDrives() { - const expirationTs = Date.now() - process.env.DEVICE_EXPIRATION_DAYS * 24 * 3600 * 1000; - - const [expiredDrives] = await orm.query(`SELECT * FROM drives WHERE is_preserved = false AND is_deleted = false AND created < ${expirationTs}`); - if (!expiredDrives) { - return; - } - - for (let t = 0; t < expiredDrives.length; t++) { - logger.info(`deleteExpiredDrives drive ${expiredDrives[t].dongle_id} ${expiredDrives[t].identifier} is older than ${process.env.DEVICE_EXPIRATION_DAYS} days, set is_deleted=true`); - await orm.models.drives.update( - { - is_deleted: true, - }, - { where: { id: expiredDrives[t].id } }, - ); - } -} - -async function removeDeletedDrivesPhysically() { - const [deletedDrives] = await orm.query('SELECT * FROM drives WHERE is_deleted = true AND is_physically_removed = false'); - if (!deletedDrives) { - return; - } - - for (let t = 0; t < deletedDrives.length; t++) { - logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} is deleted, remove physical files and clean database`); - - const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(deletedDrives[t].dongle_id) - .digest('hex'); - const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(deletedDrives[t].identifier) - .digest('hex'); - - const drivePath = `${process.env.STORAGE_PATH + deletedDrives[t].dongle_id}/${dongleIdHash}/${driveIdentifierHash}`; - logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} storage path is ${drivePath}`); - try { - const driveResult = await orm.query(`UPDATE drives SET is_physically_removed = true WHERE id = ${deletedDrives[t].id}`); - - const driveSegmentResult = await orm.query( - `DELETE FROM drive_segments WHERE drive_identifier = ${deletedDrives[t].identifier} AND dongle_id = ${deletedDrives[t].dongle_id}`, - ); - - if (driveResult != null && driveSegmentResult != null) { - deleteFolderRecursive(drivePath, { recursive: true }); - } - affectedDevices[deletedDrives[t].dongle_id] = true; - } catch (exception) { - logger.error(exception); - } - } -} - -async function deleteOverQuotaDrives() { - const [devices] = await orm.query(`SELECT * FROM devices WHERE storage_used > ${process.env.DEVICE_STORAGE_QUOTA_MB}`); - if (devices == null) { - return; - } - - for (let t = 0; t < devices.length; t++) { - let foundDriveToDelete = false; - - const [driveNormal] = await orm.query(`SELECT * FROM drives WHERE dongle_id = ${devices[t].dongle_id} AND is_preserved = false AND is_deleted = false ORDER BY created ASC LIMIT 1`); - if (driveNormal != null) { - logger.info(`deleteOverQuotaDrives drive ${driveNormal.dongle_id} ${driveNormal.identifier} (normal) is deleted for over-quota`); - await orm.query(`UPDATE drives SET is_deleted = true WHERE id = ${driveNormal.id}`); - foundDriveToDelete = true; - } - - if (!foundDriveToDelete) { - const [drivePreserved] = await orm.query('SELECT * FROM drives WHERE dongle_id = devices[t].dongle_id AND is_preserved = true AND is_deleted = false ORDER BY created ASC LIMIT 1'); - if (drivePreserved != null) { - logger.info(`deleteOverQuotaDrives drive ${drivePreserved.dongle_id} ${drivePreserved.identifier} (preserved!) is deleted for over-quota`); - await orm.query(`UPDATE drives SET is_deleted = ? WHERE id = ${drivePreserved.id}`); - foundDriveToDelete = true; - } - } - } -} - -async function deleteBootAndCrashLogs() { - const [devices] = await orm.query('SELECT * FROM devices'); - if (devices == null) { - return; - } - - for (let t = 0; t < devices.length; t++) { - const device = devices[t]; - const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(device.dongle_id) - .digest('hex'); - - const bootlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/boot/`, { attributes: ['size'] }); - const bootlogFiles = []; - if (bootlogDirectoryTree) { - for (let i = 0; i < bootlogDirectoryTree.children.length; i++) { - const timeSplit = bootlogDirectoryTree.children[i].name.replace('boot-', '') - .replace('crash-', '') - .replace('.bz2', '') - .split('--'); - const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`; - bootlogFiles.push({ - name: bootlogDirectoryTree.children[i].name, - size: bootlogDirectoryTree.children[i].size, - date: Date.parse(timeString), - path: bootlogDirectoryTree.children[i].path, - }); - } - bootlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1)); - for (let c = 5; c < bootlogFiles.length; c++) { - logger.info(`deleteBootAndCrashLogs deleting boot log ${bootlogFiles[c].path}`); - try { - fs.unlinkSync(bootlogFiles[c].path); - affectedDevices[device.dongle_id] = true; - } catch (exception) { - logger.error(exception); - } - } - } - - const crashlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/crash/`, { attributes: ['size'] }); - const crashlogFiles = []; - if (crashlogDirectoryTree) { - for (let i = 0; i < crashlogDirectoryTree.children.length; i++) { - const timeSplit = crashlogDirectoryTree.children[i].name.replace('boot-', '') - .replace('crash-', '') - .replace('.bz2', '') - .split('--'); - const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`; - crashlogFiles.push({ - name: crashlogDirectoryTree.children[i].name, - size: crashlogDirectoryTree.children[i].size, - date: Date.parse(timeString), - path: crashlogDirectoryTree.children[i].path, - }); - } - crashlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1)); - for (let c = 5; c < crashlogFiles.length; c++) { - logger.info(`deleteBootAndCrashLogs deleting crash log ${crashlogFiles[c].path}`); - try { - fs.unlinkSync(crashlogFiles[c].path); - affectedDevices[device.dongle_id] = true; - } catch (exception) { - logger.error(exception); - } - } - } - } -} - async function mainWorkerLoop() { if (Date.now() - startTime > 60 * 60 * 1000) { logger.info('EXIT WORKER AFTER 1 HOUR TO PREVENT MEMORY LEAKS...'); @@ -627,10 +490,7 @@ async function mainWorkerLoop() { try { if (Date.now() - lastCleaningTime > 20 * 60 * 1000) { - await deleteBootAndCrashLogs(); - await deleteExpiredDrives(); - await deleteOverQuotaDrives(); - await removeDeletedDrivesPhysically(); + await doCleanup(); lastCleaningTime = Date.now(); }