worker refactoring

pull/4/head
Cameron Clough 2022-03-24 19:30:26 +00:00
parent ca94b1c1a0
commit f2a1ee7cf1
No known key found for this signature in database
GPG Key ID: BFB3B74B026ED43F
1 changed files with 83 additions and 51 deletions

View File

@ -176,7 +176,10 @@ function processSegmentVideo(qcameraPath) {
} }
async function processSegmentsRecursive() { async function processSegmentsRecursive() {
logger.info('processSegmentsRecursive');
if (segmentProcessQueue.length <= segmentProcessPosition) { if (segmentProcessQueue.length <= segmentProcessPosition) {
logger.info('processSegmentsRecursive segmentProcessQueue empty');
await updateDrives(); await updateDrives();
return; return;
} }
@ -188,18 +191,23 @@ async function processSegmentsRecursive() {
fileStatus, fileStatus,
} = segmentProcessQueue[segmentProcessPosition]; } = segmentProcessQueue[segmentProcessPosition];
logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} ${JSON.stringify(segment)}`); const {
dongle_id: dongleId,
segment_id: segmentId,
} = segment;
logger.info(`processSegmentsRecursive ${dongleId} ${driveIdentifier} ${segmentId} ${JSON.stringify(segment)}`);
segment.process_attempts += 1; segment.process_attempts += 1;
DriveSegments.update({ await DriveSegments.update({
process_attempts: segment.process_attempts, process_attempts: segment.process_attempts,
}, { }, {
where: { id: segment.id }, where: { id: segment.id },
}); });
if (segment.process_attempts > 5) { if (segment.process_attempts > 5) {
logger.error(`FAILING TO PROCESS SEGMENT,${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} JSON: ${JSON.stringify(segment)} SKIPPING `); logger.error(`FAILING TO PROCESS SEGMENT,${dongleId} ${driveIdentifier} ${segmentId} JSON: ${JSON.stringify(segment)} SKIPPING `);
segmentProcessPosition += 1; segmentProcessPosition += 1;
} else { } else {
Promise.all([ Promise.all([
@ -207,7 +215,7 @@ async function processSegmentsRecursive() {
processSegmentVideo(fileStatus['qcamera.ts']), processSegmentVideo(fileStatus['qcamera.ts']),
]) ])
.then(async () => { .then(async () => {
logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} internal gps: ${Math.round(rlogTotalDistInternal * 100) / 100}m, external gps: ${Math.round(rlogTotalDistExternal * 100) / 100}m, duration: ${qcameraDuration}s`); logger.info(`processSegmentsRecursive ${dongleId} ${driveIdentifier} ${segmentId} internal gps: ${Math.round(rlogTotalDistInternal * 100) / 100}m, external gps: ${Math.round(rlogTotalDistExternal * 100) / 100}m, duration: ${qcameraDuration}s`);
const driveSegmentResult = await DriveSegments.update({ const driveSegmentResult = await DriveSegments.update({
duration: Math.round(qcameraDuration), duration: Math.round(qcameraDuration),
@ -261,85 +269,105 @@ async function updateSegments() {
}); });
logger.info('updateSegments - total segments', segments.length); logger.info('updateSegments - total segments', segments.length);
if (segments) { if (segments.length > 0) {
for (let t = 0; t < segments.length; t++) { // we process at most 15 segments per batch
const segment = segments[t]; segments.length = Math.min(segments.length, 15);
logger.debug(`updateSegments - processing ${segments.length} segments`);
await Promise.all(segments.map(async (segment) => {
logger.debug('updateSegments - segment', segment);
const {
id,
created,
dongle_id: dongleId,
drive_identifier: driveIdentifier,
is_processed: isProcessed,
segment_id: segmentId,
} = segment;
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(segment.dongle_id) .update(dongleId)
.digest('hex'); .digest('hex');
const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(segment.drive_identifier) .update(driveIdentifier)
.digest('hex'); .digest('hex');
const directoryTreePath = `${process.env.STORAGE_PATH}${segment.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${segment.drive_identifier}/${segment.segment_id}`; const directoryTreePath = `${process.env.STORAGE_PATH}${dongleId}/${dongleIdHash}/${driveIdentifierHash}/${driveIdentifier}/${segmentId}`;
const directoryTree = dirTree(directoryTreePath); const directoryTree = dirTree(directoryTreePath);
if (!directoryTree || !directoryTree.children) { if (!directoryTree || !directoryTree.children) {
console.log('missing directory', directoryTreePath); logger.warn('updateSegments - missing directory', directoryTreePath);
continue; // happens if upload in progress (db entity written but directory not yet created) return; // happens if upload in progress (db entity written but directory not yet created)
} }
// TODO: abstract this out
const SegmentFiles = {
fcamera: 'fcamera.hevc',
dcamera: 'dcamera.hevc',
qcamera: 'qcamera.ts',
qlog: 'qlog.bz2',
rlog: 'rlog.bz2',
};
const fileStatus = { const fileStatus = {
'fcamera.hevc': false, [SegmentFiles.fcamera]: false,
'dcamera.hevc': false, [SegmentFiles.dcamera]: false,
'qcamera.ts': false, [SegmentFiles.qcamera]: false,
'qlog.bz2': false, [SegmentFiles.qlog]: false,
'rlog.bz2': false, [SegmentFiles.rlog]: false,
}; };
directoryTree.children.forEach((file) => { directoryTree.children.forEach((file) => {
if (file.name in fileStatus) { if (file.name in fileStatus) {
logger.debug('updateSegments - found file', file.name);
fileStatus[file.name] = true; fileStatus[file.name] = true;
} }
}); });
const uploadComplete = Object.keys(fileStatus).every((key) => fileStatus[key]); const uploadComplete = Object.keys(fileStatus).every((key) => fileStatus[key]);
logger.debug('updateSegments - uploadComplete', uploadComplete);
if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) { if (fileStatus[SegmentFiles.qcamera] && fileStatus[SegmentFiles.rlog] && !isProcessed) {
// can process // can process
logger.debug('updateSegments - can process', id);
segmentProcessQueue.push({ segmentProcessQueue.push({
segment, segment,
fileStatus, fileStatus,
uploadComplete, uploadComplete,
driveIdentifier: `${segment.dongle_id}|${segment.drive_identifier}`, driveIdentifier: `${dongleId}|${driveIdentifier}`,
}); });
} else if (uploadComplete) { } else if (uploadComplete) {
logger.info(`updateSegments uploadComplete for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`); logger.info(`updateSegments uploadComplete for ${dongleId} ${driveIdentifier} ${segmentId}`);
await DriveSegments.update({ await DriveSegments.update({
upload_complete: true, upload_complete: true,
is_stalled: false, is_stalled: false,
}, { where: { id: segment.id } }); }, { where: { id } });
affectedDrives[`${segment.dongle_id}|${segment.drive_identifier}`] = true; affectedDrives[`${dongleId}|${driveIdentifier}`] = true;
} else if (Date.now() - segment.created > 10 * 24 * 3600 * 1000) { } else if (Date.now() - created > 10 * 24 * 3600 * 1000) {
// ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled) // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled)
logger.info(`updateSegments isStalled for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`); logger.warn(`updateSegments isStalled for ${dongleId} ${driveIdentifier} ${segmentId}`);
await DriveSegments.update({ await DriveSegments.update({
is_stalled: true, is_stalled: true,
}, { where: { id: segment.id } }); }, { where: { id } });
} }
}));
// we process at most 15 segments per batch
if (segmentProcessQueue.length >= 15) {
break;
}
}
} }
if (segmentProcessQueue.length > 0) { if (segmentProcessQueue.length > 0) {
logger.info('updateSegments - processing', segmentProcessQueue.length);
await processSegmentsRecursive(); await processSegmentsRecursive();
} else { } else {
// if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload // if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload
logger.info('updateSegments - no segments to process, updating drives...');
await updateDrives(); await updateDrives();
} }
} }
async function updateDevices() { async function updateDevices() {
// go through all affected devices (with deleted or updated drives) and update them (storage_used) // go through all affected devices (with deleted or updated drives) and update them (storage_used)
logger.info(`updateDevices - affected drives: ${JSON.stringify(affectedDevices)}`); logger.info(`updateDevices - affected devices: ${JSON.stringify(affectedDevices)}`);
await Promise.all(Object.keys(affectedDevices).map(async (dongleId) => { await Promise.all(Object.keys(affectedDevices).map(async (dongleId) => {
const device = await Devices.findOne({ where: { dongle_id: dongleId } }); const device = await Devices.findOne({ where: { dongle_id: dongleId } });
@ -349,16 +377,16 @@ async function updateDevices() {
} }
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(device.dongle_id) .update(dongleId)
.digest('hex'); .digest('hex');
const devicePath = `${process.env.STORAGE_PATH}${device.dongle_id}/${dongleIdHash}`; const devicePath = `${process.env.STORAGE_PATH}${dongleId}/${dongleIdHash}`;
const deviceQuotaMb = Math.round(parseInt(execSync(`du -s ${devicePath} | awk -F'\t' '{print $1;}'`) const deviceQuotaMb = Math.round(parseInt(execSync(`du -s ${devicePath} | awk -F'\t' '{print $1;}'`)
.toString(), 10) / 1024); .toString(), 10) / 1024);
logger.info(`updateDevices device ${dongleId} has an updated storage_used of: ${deviceQuotaMb} MB`); logger.info(`updateDevices device ${dongleId} has an updated storage_used of: ${deviceQuotaMb} MB`);
await Devices.update( await Devices.update(
{ storage_used: deviceQuotaMb }, { storage_used: deviceQuotaMb },
{ where: { dongle_id: device.dongle_id } }, { where: { dongle_id: dongleId } },
); );
delete affectedDevices[dongleId]; delete affectedDevices[dongleId];
@ -370,11 +398,11 @@ async function updateDrives() {
logger.info(`updateDrives - affected drives: ${JSON.stringify(affectedDrives)}`); logger.info(`updateDrives - affected drives: ${JSON.stringify(affectedDrives)}`);
await Promise.all(Object.keys(affectedDrives).map(async (key) => { await Promise.all(Object.keys(affectedDrives).map(async (key) => {
const [dongleId, driveIdentifier] = key.split('|'); const [dongleId, identifier] = key.split('|');
const drive = await Drives.findOne({ const drive = await Drives.findOne({
where: { where: {
identifier: driveIdentifier, identifier,
dongle_id: dongleId, dongle_id: dongleId,
}, },
}); });
@ -384,13 +412,13 @@ async function updateDrives() {
} }
const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(drive.dongle_id) .update(dongleId)
.digest('hex'); .digest('hex');
const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT)
.update(drive.identifier) .update(identifier)
.digest('hex'); .digest('hex');
const driveUrl = `${process.env.BASE_DRIVE_DOWNLOAD_URL}${drive.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${drive.identifier}`; const driveUrl = `${process.env.BASE_DRIVE_DOWNLOAD_URL}${dongleId}/${dongleIdHash}/${driveIdentifierHash}/${identifier}`;
const drivePath = `${process.env.STORAGE_PATH}${drive.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${drive.identifier}`; const drivePath = `${process.env.STORAGE_PATH}${dongleId}/${dongleIdHash}/${driveIdentifierHash}/${identifier}`;
let uploadComplete = true; let uploadComplete = true;
let isProcessed = true; let isProcessed = true;
@ -401,24 +429,26 @@ async function updateDrives() {
const driveSegments = await DriveSegments.findAll({ const driveSegments = await DriveSegments.findAll({
where: { where: {
drive_identifier: driveIdentifier, drive_identifier: identifier,
dongle_id: dongleId, dongle_id: dongleId,
}, },
order: [['segment_id', 'ASC']], order: [['segment_id', 'ASC']],
}); });
if (driveSegments) { if (driveSegments) {
for (let t = 0; t < driveSegments.length; t++) { driveSegments.forEach((driveSegment) => {
if (!driveSegments[t].upload_complete) uploadComplete = false; if (!driveSegment.upload_complete) {
if (!driveSegments[t].is_processed) { uploadComplete = false;
}
if (!driveSegment.is_processed) {
isProcessed = false; isProcessed = false;
} else { } else {
totalDistanceMeters += parseFloat(driveSegments[t].distance_meters); totalDistanceMeters += parseFloat(driveSegment.distance_meters);
totalDurationSeconds += parseFloat(driveSegments[t].duration); totalDurationSeconds += parseFloat(driveSegment.duration);
playlistSegmentStrings += `#EXTINF:${driveSegments[t].duration},${driveSegments[t].segment_id}\n${driveUrl}/${driveSegments[t].segment_id}/qcamera.ts\n`; playlistSegmentStrings += `#EXTINF:${driveSegment.duration},${driveSegment.segment_id}\n${driveUrl}/${driveSegment.segment_id}/qcamera.ts\n`;
} }
} });
} else { } else {
logger.warn('updateDrives driveSegments not found', key); logger.warn('updateDrives driveSegments not found', key);
} }
@ -439,7 +469,9 @@ async function updateDrives() {
} catch (exception) { } catch (exception) {
logger.error(exception); logger.error(exception);
} }
if (metadata == null) metadata = {}; if (metadata == null) {
metadata = {};
}
if (affectedDriveInitData[key] && !metadata.InitData) { if (affectedDriveInitData[key] && !metadata.InitData) {
metadata.InitData = affectedDriveInitData[key]; metadata.InitData = affectedDriveInitData[key];
@ -448,7 +480,7 @@ async function updateDrives() {
metadata.CarParams = affectedDriveCarParams[key]; metadata.CarParams = affectedDriveCarParams[key];
} }
logger.info(`updateDrives drive ${dongleId} ${driveIdentifier} uploadComplete: ${uploadComplete}`); logger.info(`updateDrives drive ${dongleId} ${identifier} uploadComplete: ${uploadComplete}`);
await Drives.update( await Drives.update(
{ {