refactor(worker): extract cleanup functions

pull/4/head
Cameron Clough 2022-03-22 22:34:28 +00:00
parent 4a39c36818
commit 930665f3c6
No known key found for this signature in database
GPG Key ID: BFB3B74B026ED43F
2 changed files with 241 additions and 211 deletions

View File

@ -0,0 +1,170 @@
import crypto from 'crypto';
import dirTree from 'directory-tree';
import fs from 'fs';
import log4js from 'log4js';
import { Drives } from '../models';
import orm from '../models/orm';
import { deleteFolderRecursive } from './storage';
const logger = log4js.getLogger('cleanup');
export let affectedDevices = {};
async function deleteBootAndCrashLogs() {
const [devices] = await orm.query('SELECT * FROM devices');
if (devices == null) {
return;
}
for (let t = 0; t < devices.length; t++) {
const device = devices[t];
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(device.dongle_id)
.digest('hex');
const bootlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/boot/`, { attributes: ['size'] });
const bootlogFiles = [];
if (bootlogDirectoryTree) {
for (let i = 0; i < bootlogDirectoryTree.children.length; i++) {
const timeSplit = bootlogDirectoryTree.children[i].name.replace('boot-', '')
.replace('crash-', '')
.replace('.bz2', '')
.split('--');
const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`;
bootlogFiles.push({
name: bootlogDirectoryTree.children[i].name,
size: bootlogDirectoryTree.children[i].size,
date: Date.parse(timeString),
path: bootlogDirectoryTree.children[i].path,
});
}
bootlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
for (let c = 5; c < bootlogFiles.length; c++) {
logger.info(`deleteBootAndCrashLogs deleting boot log ${bootlogFiles[c].path}`);
try {
fs.unlinkSync(bootlogFiles[c].path);
affectedDevices[device.dongle_id] = true;
} catch (exception) {
logger.error(exception);
}
}
}
const crashlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/crash/`, { attributes: ['size'] });
const crashlogFiles = [];
if (crashlogDirectoryTree) {
for (let i = 0; i < crashlogDirectoryTree.children.length; i++) {
const timeSplit = crashlogDirectoryTree.children[i].name.replace('boot-', '')
.replace('crash-', '')
.replace('.bz2', '')
.split('--');
const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`;
crashlogFiles.push({
name: crashlogDirectoryTree.children[i].name,
size: crashlogDirectoryTree.children[i].size,
date: Date.parse(timeString),
path: crashlogDirectoryTree.children[i].path,
});
}
crashlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
for (let c = 5; c < crashlogFiles.length; c++) {
logger.info(`deleteBootAndCrashLogs deleting crash log ${crashlogFiles[c].path}`);
try {
fs.unlinkSync(crashlogFiles[c].path);
affectedDevices[device.dongle_id] = true;
} catch (exception) {
logger.error(exception);
}
}
}
}
}
async function deleteExpiredDrives() {
const expirationTs = Date.now() - process.env.DEVICE_EXPIRATION_DAYS * 24 * 3600 * 1000;
const [expiredDrives] = await orm.query(`SELECT * FROM drives WHERE is_preserved = false AND is_deleted = false AND created < ${expirationTs}`);
if (!expiredDrives) {
return;
}
for (let t = 0; t < expiredDrives.length; t++) {
logger.info(`deleteExpiredDrives drive ${expiredDrives[t].dongle_id} ${expiredDrives[t].identifier} is older than ${process.env.DEVICE_EXPIRATION_DAYS} days, set is_deleted=true`);
await Drives.update(
{
is_deleted: true,
},
{ where: { id: expiredDrives[t].id } },
);
}
}
async function deleteOverQuotaDrives() {
const [devices] = await orm.query(`SELECT * FROM devices WHERE storage_used > ${process.env.DEVICE_STORAGE_QUOTA_MB}`);
if (devices == null) {
return;
}
for (let t = 0; t < devices.length; t++) {
let foundDriveToDelete = false;
const [driveNormal] = await orm.query(`SELECT * FROM drives WHERE dongle_id = ${devices[t].dongle_id} AND is_preserved = false AND is_deleted = false ORDER BY created ASC LIMIT 1`);
if (driveNormal != null) {
logger.info(`deleteOverQuotaDrives drive ${driveNormal.dongle_id} ${driveNormal.identifier} (normal) is deleted for over-quota`);
await orm.query(`UPDATE drives SET is_deleted = true WHERE id = ${driveNormal.id}`);
foundDriveToDelete = true;
}
if (!foundDriveToDelete) {
const [drivePreserved] = await orm.query('SELECT * FROM drives WHERE dongle_id = devices[t].dongle_id AND is_preserved = true AND is_deleted = false ORDER BY created ASC LIMIT 1');
if (drivePreserved != null) {
logger.info(`deleteOverQuotaDrives drive ${drivePreserved.dongle_id} ${drivePreserved.identifier} (preserved!) is deleted for over-quota`);
await orm.query(`UPDATE drives SET is_deleted = ? WHERE id = ${drivePreserved.id}`);
foundDriveToDelete = true;
}
}
}
}
async function removeDeletedDrivesPhysically() {
const [deletedDrives] = await orm.query('SELECT * FROM drives WHERE is_deleted = true AND is_physically_removed = false');
if (!deletedDrives) {
return;
}
for (let t = 0; t < deletedDrives.length; t++) {
logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} is deleted, remove physical files and clean database`);
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(deletedDrives[t].dongle_id)
.digest('hex');
const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(deletedDrives[t].identifier)
.digest('hex');
const drivePath = `${process.env.STORAGE_PATH + deletedDrives[t].dongle_id}/${dongleIdHash}/${driveIdentifierHash}`;
logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} storage path is ${drivePath}`);
try {
const driveResult = await orm.query(`UPDATE drives SET is_physically_removed = true WHERE id = ${deletedDrives[t].id}`);
const driveSegmentResult = await orm.query(
`DELETE FROM drive_segments WHERE drive_identifier = ${deletedDrives[t].identifier} AND dongle_id = ${deletedDrives[t].dongle_id}`,
);
if (driveResult != null && driveSegmentResult != null) {
deleteFolderRecursive(drivePath, { recursive: true });
}
affectedDevices[deletedDrives[t].dongle_id] = true;
} catch (exception) {
logger.error(exception);
}
}
}
export async function doCleanup() {
await deleteBootAndCrashLogs();
await deleteExpiredDrives();
await deleteOverQuotaDrives();
await removeDeletedDrivesPhysically();
}

View File

@ -1,18 +1,18 @@
import 'dotenv/config';
import crypto from 'crypto';
import fs from 'fs';
import log4js from 'log4js';
import dirTree from 'directory-tree';
import { execSync } from 'child_process';
import Reader from '@commaai/log_reader';
import { execSync } from 'child_process';
import crypto from 'crypto';
import dirTree from 'directory-tree';
import ffprobe from 'ffprobe';
import ffprobeStatic from 'ffprobe-static';
import fs from 'fs';
import log4js from 'log4js';
import { Op } from 'sequelize';
import orm from '../models/orm';
import {
initializeStorage,
deleteFolderRecursive,
} from './storage';
import { Devices, Drives, DriveSegments } from '../models';
import { affectedDevices, doCleanup } from './cleanup';
import { initializeStorage } from './storage';
const logger = log4js.getLogger();
@ -26,8 +26,6 @@ let affectedDrives = {};
let affectedDriveInitData = {};
let affectedDriveCarParams = {};
let affectedDevices = {};
let rlogLastTsInternal = 0;
let rlogPrevLatInternal = -1000;
let rlogPrevLngInternal = -1000;
@ -194,9 +192,11 @@ async function processSegmentsRecursive() {
segment.process_attempts += 1;
await orm.query(
`UPDATE drive_segments SET process_attempts = ${segment.process_attempts} WHERE id = ${segment.id}`,
);
DriveSegments.update({
process_attempts: segment.process_attempts,
}, {
where: { id: segment.id },
});
if (segment.process_attempts > 5) {
logger.error(`FAILING TO PROCESS SEGMENT,${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} JSON: ${JSON.stringify(segment)} SKIPPING `);
@ -209,7 +209,7 @@ async function processSegmentsRecursive() {
.then(async () => {
logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} internal gps: ${Math.round(rlogTotalDistInternal * 100) / 100}m, external gps: ${Math.round(rlogTotalDistExternal * 100) / 100}m, duration: ${qcameraDuration}s`);
const driveSegmentResult = await orm.models.drive_segments.update({
const driveSegmentResult = await DriveSegments.update({
duration: Math.round(qcameraDuration),
distance_meters: Math.round(
Math.max(rlogTotalDistInternal, rlogTotalDistExternal) * 10,
@ -248,12 +248,21 @@ async function updateSegments() {
affectedDriveCarParams = {};
affectedDriveInitData = {};
const [driveSegments] = await orm.query('SELECT * FROM drive_segments WHERE upload_complete = false AND is_stalled = false AND process_attempts < 5 ORDER BY created ASC');
logger.info('updateSegments - total drive_segments', driveSegments.length);
const segments = await DriveSegments.findAll({
where: {
upload_complete: false,
is_stalled: false,
process_attempts: {
[Op.lt]: 5,
},
},
order: [['created', 'ASC']],
});
logger.info('updateSegments - total segments', segments.length);
if (driveSegments) {
for (let t = 0; t < driveSegments.length; t++) {
const segment = driveSegments[t];
if (segments) {
for (let t = 0; t < segments.length; t++) {
const segment = segments[t];
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(segment.dongle_id)
@ -286,7 +295,8 @@ async function updateSegments() {
const uploadComplete = Object.keys(fileStatus).every((key) => fileStatus[key]);
if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) { // can process
if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) {
// can process
segmentProcessQueue.push({
segment,
fileStatus,
@ -296,17 +306,19 @@ async function updateSegments() {
} else if (uploadComplete) {
logger.info(`updateSegments uploadComplete for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`);
await orm.query(
`UPDATE drive_segments SET upload_complete = true, is_stalled = false WHERE id = ${segment.id}`,
);
await DriveSegments.update({
upload_complete: true,
is_stalled: false,
}, { where: { id: segment.id } });
affectedDrives[`${segment.dongle_id}|${segment.drive_identifier}`] = true;
} else if (Date.now() - segment.created > 10 * 24 * 3600 * 1000) { // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled)
} else if (Date.now() - segment.created > 10 * 24 * 3600 * 1000) {
// ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled)
logger.info(`updateSegments isStalled for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`);
await orm.query(
`UPDATE drive_segments SET is_stalled = true WHERE id = ${segment.id}`,
);
await DriveSegments.update({
is_stalled: true,
}, { where: { id: segment.id } });
}
// we process at most 15 segments per batch
@ -317,19 +329,23 @@ async function updateSegments() {
}
if (segmentProcessQueue.length > 0) {
processSegmentsRecursive();
await processSegmentsRecursive();
} else {
// if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload
updateDrives();
await updateDrives();
}
}
async function updateDevices() {
// go through all affected devices (with deleted or updated drives) and update them (storage_used)
logger.info(`updateDevices - affected drives: ${JSON.stringify(affectedDevices)}`);
for (const dongleId of Object.keys(affectedDevices)) {
const [device] = await orm.query(`SELECT * FROM devices WHERE dongle_id = ${dongleId}`);
if (device == null) continue;
await Promise.all(Object.keys(affectedDevices).map(async (dongleId) => {
const device = await Devices.findOne({ where: { dongle_id: dongleId } });
if (!device) {
logger.warn(`updateDevices - device not found for dongle_id ${dongleId}`);
return;
}
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(device.dongle_id)
@ -339,35 +355,33 @@ async function updateDevices() {
.toString(), 10) / 1024);
logger.info(`updateDevices device ${dongleId} has an updated storage_used of: ${deviceQuotaMb} MB`);
await orm.models.drives.update(
{
storage_used: deviceQuotaMb,
},
{
where: {
dongle_id: device.dongle_id,
},
},
await Devices.update(
{ storage_used: deviceQuotaMb },
{ where: { dongle_id: device.dongle_id } },
);
}
affectedDevices = [];
delete affectedDevices[dongleId];
}));
}
async function updateDrives() {
// go through all affected drives and update them / complete and/or build m3u8
logger.info(`updateDrives - affected drives: ${JSON.stringify(affectedDrives)}`);
for (const key of Object.keys(affectedDrives)) {
await Promise.all(Object.keys(affectedDrives).map(async (key) => {
const [dongleId, driveIdentifier] = key.split('|');
let drive = await orm.models.drives.findOne({
where: { identifier: driveIdentifier, dongle_id: dongleId },
const drive = await Drives.findOne({
where: {
identifier: driveIdentifier,
dongle_id: dongleId,
},
});
if (!drive) {
continue;
logger.warn('updateDrives drive not found', key);
return;
}
drive = drive.dataValues;
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(drive.dongle_id)
.digest('hex');
@ -384,14 +398,12 @@ async function updateDrives() {
let totalDurationSeconds = 0;
let playlistSegmentStrings = '';
const driveSegments = await orm.models.drive_segments.findAll({
const driveSegments = await DriveSegments.findAll({
where: {
drive_identifier: driveIdentifier,
dongle_id: dongleId,
},
order: [
orm.fn('ASC', orm.col('segment_id')),
],
order: [['segment_id', 'ASC']],
});
if (driveSegments) {
@ -406,6 +418,8 @@ async function updateDrives() {
playlistSegmentStrings += `#EXTINF:${driveSegments[t].duration},${driveSegments[t].segment_id}\n${driveUrl}/${driveSegments[t].segment_id}/qcamera.ts\n`;
}
}
} else {
logger.warn('updateDrives driveSegments not found', key);
}
let { filesize } = drive;
@ -435,7 +449,7 @@ async function updateDrives() {
logger.info(`updateDrives drive ${dongleId} ${driveIdentifier} uploadComplete: ${uploadComplete}`);
await orm.models.drives.update(
await Drives.update(
{
distance_meters: Math.round(totalDistanceMeters),
duration: Math.round(totalDurationSeconds),
@ -460,164 +474,13 @@ async function updateDrives() {
fs.writeFileSync(`${drivePath}/qcamera.m3u8`, playlist);
}
}
}));
await updateDevices();
setTimeout(mainWorkerLoop);
}
async function deleteExpiredDrives() {
const expirationTs = Date.now() - process.env.DEVICE_EXPIRATION_DAYS * 24 * 3600 * 1000;
const [expiredDrives] = await orm.query(`SELECT * FROM drives WHERE is_preserved = false AND is_deleted = false AND created < ${expirationTs}`);
if (!expiredDrives) {
return;
}
for (let t = 0; t < expiredDrives.length; t++) {
logger.info(`deleteExpiredDrives drive ${expiredDrives[t].dongle_id} ${expiredDrives[t].identifier} is older than ${process.env.DEVICE_EXPIRATION_DAYS} days, set is_deleted=true`);
await orm.models.drives.update(
{
is_deleted: true,
},
{ where: { id: expiredDrives[t].id } },
);
}
}
async function removeDeletedDrivesPhysically() {
const [deletedDrives] = await orm.query('SELECT * FROM drives WHERE is_deleted = true AND is_physically_removed = false');
if (!deletedDrives) {
return;
}
for (let t = 0; t < deletedDrives.length; t++) {
logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} is deleted, remove physical files and clean database`);
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(deletedDrives[t].dongle_id)
.digest('hex');
const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(deletedDrives[t].identifier)
.digest('hex');
const drivePath = `${process.env.STORAGE_PATH + deletedDrives[t].dongle_id}/${dongleIdHash}/${driveIdentifierHash}`;
logger.info(`removeDeletedDrivesPhysically drive ${deletedDrives[t].dongle_id} ${deletedDrives[t].identifier} storage path is ${drivePath}`);
try {
const driveResult = await orm.query(`UPDATE drives SET is_physically_removed = true WHERE id = ${deletedDrives[t].id}`);
const driveSegmentResult = await orm.query(
`DELETE FROM drive_segments WHERE drive_identifier = ${deletedDrives[t].identifier} AND dongle_id = ${deletedDrives[t].dongle_id}`,
);
if (driveResult != null && driveSegmentResult != null) {
deleteFolderRecursive(drivePath, { recursive: true });
}
affectedDevices[deletedDrives[t].dongle_id] = true;
} catch (exception) {
logger.error(exception);
}
}
}
async function deleteOverQuotaDrives() {
const [devices] = await orm.query(`SELECT * FROM devices WHERE storage_used > ${process.env.DEVICE_STORAGE_QUOTA_MB}`);
if (devices == null) {
return;
}
for (let t = 0; t < devices.length; t++) {
let foundDriveToDelete = false;
const [driveNormal] = await orm.query(`SELECT * FROM drives WHERE dongle_id = ${devices[t].dongle_id} AND is_preserved = false AND is_deleted = false ORDER BY created ASC LIMIT 1`);
if (driveNormal != null) {
logger.info(`deleteOverQuotaDrives drive ${driveNormal.dongle_id} ${driveNormal.identifier} (normal) is deleted for over-quota`);
await orm.query(`UPDATE drives SET is_deleted = true WHERE id = ${driveNormal.id}`);
foundDriveToDelete = true;
}
if (!foundDriveToDelete) {
const [drivePreserved] = await orm.query('SELECT * FROM drives WHERE dongle_id = devices[t].dongle_id AND is_preserved = true AND is_deleted = false ORDER BY created ASC LIMIT 1');
if (drivePreserved != null) {
logger.info(`deleteOverQuotaDrives drive ${drivePreserved.dongle_id} ${drivePreserved.identifier} (preserved!) is deleted for over-quota`);
await orm.query(`UPDATE drives SET is_deleted = ? WHERE id = ${drivePreserved.id}`);
foundDriveToDelete = true;
}
}
}
}
async function deleteBootAndCrashLogs() {
const [devices] = await orm.query('SELECT * FROM devices');
if (devices == null) {
return;
}
for (let t = 0; t < devices.length; t++) {
const device = devices[t];
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(device.dongle_id)
.digest('hex');
const bootlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/boot/`, { attributes: ['size'] });
const bootlogFiles = [];
if (bootlogDirectoryTree) {
for (let i = 0; i < bootlogDirectoryTree.children.length; i++) {
const timeSplit = bootlogDirectoryTree.children[i].name.replace('boot-', '')
.replace('crash-', '')
.replace('.bz2', '')
.split('--');
const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`;
bootlogFiles.push({
name: bootlogDirectoryTree.children[i].name,
size: bootlogDirectoryTree.children[i].size,
date: Date.parse(timeString),
path: bootlogDirectoryTree.children[i].path,
});
}
bootlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
for (let c = 5; c < bootlogFiles.length; c++) {
logger.info(`deleteBootAndCrashLogs deleting boot log ${bootlogFiles[c].path}`);
try {
fs.unlinkSync(bootlogFiles[c].path);
affectedDevices[device.dongle_id] = true;
} catch (exception) {
logger.error(exception);
}
}
}
const crashlogDirectoryTree = dirTree(`${process.env.STORAGE_PATH + device.dongle_id}/${dongleIdHash}/crash/`, { attributes: ['size'] });
const crashlogFiles = [];
if (crashlogDirectoryTree) {
for (let i = 0; i < crashlogDirectoryTree.children.length; i++) {
const timeSplit = crashlogDirectoryTree.children[i].name.replace('boot-', '')
.replace('crash-', '')
.replace('.bz2', '')
.split('--');
const timeString = `${timeSplit[0]} ${timeSplit[1].replace(/-/g, ':')}`;
crashlogFiles.push({
name: crashlogDirectoryTree.children[i].name,
size: crashlogDirectoryTree.children[i].size,
date: Date.parse(timeString),
path: crashlogDirectoryTree.children[i].path,
});
}
crashlogFiles.sort((a, b) => ((a.date < b.date) ? 1 : -1));
for (let c = 5; c < crashlogFiles.length; c++) {
logger.info(`deleteBootAndCrashLogs deleting crash log ${crashlogFiles[c].path}`);
try {
fs.unlinkSync(crashlogFiles[c].path);
affectedDevices[device.dongle_id] = true;
} catch (exception) {
logger.error(exception);
}
}
}
}
}
async function mainWorkerLoop() {
if (Date.now() - startTime > 60 * 60 * 1000) {
logger.info('EXIT WORKER AFTER 1 HOUR TO PREVENT MEMORY LEAKS...');
@ -627,10 +490,7 @@ async function mainWorkerLoop() {
try {
if (Date.now() - lastCleaningTime > 20 * 60 * 1000) {
await deleteBootAndCrashLogs();
await deleteExpiredDrives();
await deleteOverQuotaDrives();
await removeDeletedDrivesPhysically();
await doCleanup();
lastCleaningTime = Date.now();
}