From f2a1ee7cf1694fd20588d91ec8f4c2a5fad4c988 Mon Sep 17 00:00:00 2001 From: Cameron Clough Date: Thu, 24 Mar 2022 19:30:26 +0000 Subject: [PATCH] worker refactoring --- src/worker/worker.js | 134 +++++++++++++++++++++++++++---------------- 1 file changed, 83 insertions(+), 51 deletions(-) diff --git a/src/worker/worker.js b/src/worker/worker.js index 95067b4..e5a5d8d 100644 --- a/src/worker/worker.js +++ b/src/worker/worker.js @@ -176,7 +176,10 @@ function processSegmentVideo(qcameraPath) { } async function processSegmentsRecursive() { + logger.info('processSegmentsRecursive'); + if (segmentProcessQueue.length <= segmentProcessPosition) { + logger.info('processSegmentsRecursive segmentProcessQueue empty'); await updateDrives(); return; } @@ -188,18 +191,23 @@ async function processSegmentsRecursive() { fileStatus, } = segmentProcessQueue[segmentProcessPosition]; - logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} ${JSON.stringify(segment)}`); + const { + dongle_id: dongleId, + segment_id: segmentId, + } = segment; + + logger.info(`processSegmentsRecursive ${dongleId} ${driveIdentifier} ${segmentId} ${JSON.stringify(segment)}`); segment.process_attempts += 1; - DriveSegments.update({ + await DriveSegments.update({ process_attempts: segment.process_attempts, }, { where: { id: segment.id }, }); if (segment.process_attempts > 5) { - logger.error(`FAILING TO PROCESS SEGMENT,${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} JSON: ${JSON.stringify(segment)} SKIPPING `); + logger.error(`FAILING TO PROCESS SEGMENT,${dongleId} ${driveIdentifier} ${segmentId} JSON: ${JSON.stringify(segment)} SKIPPING `); segmentProcessPosition += 1; } else { Promise.all([ @@ -207,7 +215,7 @@ async function processSegmentsRecursive() { processSegmentVideo(fileStatus['qcamera.ts']), ]) .then(async () => { - logger.info(`processSegmentsRecursive ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id} internal gps: ${Math.round(rlogTotalDistInternal * 100) / 100}m, external gps: ${Math.round(rlogTotalDistExternal * 100) / 100}m, duration: ${qcameraDuration}s`); + logger.info(`processSegmentsRecursive ${dongleId} ${driveIdentifier} ${segmentId} internal gps: ${Math.round(rlogTotalDistInternal * 100) / 100}m, external gps: ${Math.round(rlogTotalDistExternal * 100) / 100}m, duration: ${qcameraDuration}s`); const driveSegmentResult = await DriveSegments.update({ duration: Math.round(qcameraDuration), @@ -261,85 +269,105 @@ async function updateSegments() { }); logger.info('updateSegments - total segments', segments.length); - if (segments) { - for (let t = 0; t < segments.length; t++) { - const segment = segments[t]; + if (segments.length > 0) { + // we process at most 15 segments per batch + segments.length = Math.min(segments.length, 15); + logger.debug(`updateSegments - processing ${segments.length} segments`); + + await Promise.all(segments.map(async (segment) => { + logger.debug('updateSegments - segment', segment); + const { + id, + created, + dongle_id: dongleId, + drive_identifier: driveIdentifier, + is_processed: isProcessed, + segment_id: segmentId, + } = segment; const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(segment.dongle_id) + .update(dongleId) .digest('hex'); const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(segment.drive_identifier) + .update(driveIdentifier) .digest('hex'); - const directoryTreePath = `${process.env.STORAGE_PATH}${segment.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${segment.drive_identifier}/${segment.segment_id}`; + const directoryTreePath = `${process.env.STORAGE_PATH}${dongleId}/${dongleIdHash}/${driveIdentifierHash}/${driveIdentifier}/${segmentId}`; const directoryTree = dirTree(directoryTreePath); if (!directoryTree || !directoryTree.children) { - console.log('missing directory', directoryTreePath); - continue; // happens if upload in progress (db entity written but directory not yet created) + logger.warn('updateSegments - missing directory', directoryTreePath); + return; // happens if upload in progress (db entity written but directory not yet created) } + // TODO: abstract this out + const SegmentFiles = { + fcamera: 'fcamera.hevc', + dcamera: 'dcamera.hevc', + qcamera: 'qcamera.ts', + qlog: 'qlog.bz2', + rlog: 'rlog.bz2', + }; const fileStatus = { - 'fcamera.hevc': false, - 'dcamera.hevc': false, - 'qcamera.ts': false, - 'qlog.bz2': false, - 'rlog.bz2': false, + [SegmentFiles.fcamera]: false, + [SegmentFiles.dcamera]: false, + [SegmentFiles.qcamera]: false, + [SegmentFiles.qlog]: false, + [SegmentFiles.rlog]: false, }; directoryTree.children.forEach((file) => { if (file.name in fileStatus) { + logger.debug('updateSegments - found file', file.name); fileStatus[file.name] = true; } }); const uploadComplete = Object.keys(fileStatus).every((key) => fileStatus[key]); + logger.debug('updateSegments - uploadComplete', uploadComplete); - if (fileStatus['qcamera.ts'] !== false && fileStatus['rlog.bz2'] !== false && !segment.is_processed) { + if (fileStatus[SegmentFiles.qcamera] && fileStatus[SegmentFiles.rlog] && !isProcessed) { // can process + logger.debug('updateSegments - can process', id); segmentProcessQueue.push({ segment, fileStatus, uploadComplete, - driveIdentifier: `${segment.dongle_id}|${segment.drive_identifier}`, + driveIdentifier: `${dongleId}|${driveIdentifier}`, }); } else if (uploadComplete) { - logger.info(`updateSegments uploadComplete for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`); + logger.info(`updateSegments uploadComplete for ${dongleId} ${driveIdentifier} ${segmentId}`); await DriveSegments.update({ upload_complete: true, is_stalled: false, - }, { where: { id: segment.id } }); + }, { where: { id } }); - affectedDrives[`${segment.dongle_id}|${segment.drive_identifier}`] = true; - } else if (Date.now() - segment.created > 10 * 24 * 3600 * 1000) { + affectedDrives[`${dongleId}|${driveIdentifier}`] = true; + } else if (Date.now() - created > 10 * 24 * 3600 * 1000) { // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled) - logger.info(`updateSegments isStalled for ${segment.dongle_id} ${segment.drive_identifier} ${segment.segment_id}`); + logger.warn(`updateSegments isStalled for ${dongleId} ${driveIdentifier} ${segmentId}`); await DriveSegments.update({ is_stalled: true, - }, { where: { id: segment.id } }); + }, { where: { id } }); } - - // we process at most 15 segments per batch - if (segmentProcessQueue.length >= 15) { - break; - } - } + })); } if (segmentProcessQueue.length > 0) { + logger.info('updateSegments - processing', segmentProcessQueue.length); await processSegmentsRecursive(); } else { // if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload + logger.info('updateSegments - no segments to process, updating drives...'); await updateDrives(); } } async function updateDevices() { // go through all affected devices (with deleted or updated drives) and update them (storage_used) - logger.info(`updateDevices - affected drives: ${JSON.stringify(affectedDevices)}`); + logger.info(`updateDevices - affected devices: ${JSON.stringify(affectedDevices)}`); await Promise.all(Object.keys(affectedDevices).map(async (dongleId) => { const device = await Devices.findOne({ where: { dongle_id: dongleId } }); @@ -349,16 +377,16 @@ async function updateDevices() { } const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(device.dongle_id) + .update(dongleId) .digest('hex'); - const devicePath = `${process.env.STORAGE_PATH}${device.dongle_id}/${dongleIdHash}`; + const devicePath = `${process.env.STORAGE_PATH}${dongleId}/${dongleIdHash}`; const deviceQuotaMb = Math.round(parseInt(execSync(`du -s ${devicePath} | awk -F'\t' '{print $1;}'`) .toString(), 10) / 1024); logger.info(`updateDevices device ${dongleId} has an updated storage_used of: ${deviceQuotaMb} MB`); await Devices.update( { storage_used: deviceQuotaMb }, - { where: { dongle_id: device.dongle_id } }, + { where: { dongle_id: dongleId } }, ); delete affectedDevices[dongleId]; @@ -370,11 +398,11 @@ async function updateDrives() { logger.info(`updateDrives - affected drives: ${JSON.stringify(affectedDrives)}`); await Promise.all(Object.keys(affectedDrives).map(async (key) => { - const [dongleId, driveIdentifier] = key.split('|'); + const [dongleId, identifier] = key.split('|'); const drive = await Drives.findOne({ where: { - identifier: driveIdentifier, + identifier, dongle_id: dongleId, }, }); @@ -384,13 +412,13 @@ async function updateDrives() { } const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(drive.dongle_id) + .update(dongleId) .digest('hex'); const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(drive.identifier) + .update(identifier) .digest('hex'); - const driveUrl = `${process.env.BASE_DRIVE_DOWNLOAD_URL}${drive.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${drive.identifier}`; - const drivePath = `${process.env.STORAGE_PATH}${drive.dongle_id}/${dongleIdHash}/${driveIdentifierHash}/${drive.identifier}`; + const driveUrl = `${process.env.BASE_DRIVE_DOWNLOAD_URL}${dongleId}/${dongleIdHash}/${driveIdentifierHash}/${identifier}`; + const drivePath = `${process.env.STORAGE_PATH}${dongleId}/${dongleIdHash}/${driveIdentifierHash}/${identifier}`; let uploadComplete = true; let isProcessed = true; @@ -401,24 +429,26 @@ async function updateDrives() { const driveSegments = await DriveSegments.findAll({ where: { - drive_identifier: driveIdentifier, + drive_identifier: identifier, dongle_id: dongleId, }, order: [['segment_id', 'ASC']], }); if (driveSegments) { - for (let t = 0; t < driveSegments.length; t++) { - if (!driveSegments[t].upload_complete) uploadComplete = false; - if (!driveSegments[t].is_processed) { + driveSegments.forEach((driveSegment) => { + if (!driveSegment.upload_complete) { + uploadComplete = false; + } + if (!driveSegment.is_processed) { isProcessed = false; } else { - totalDistanceMeters += parseFloat(driveSegments[t].distance_meters); - totalDurationSeconds += parseFloat(driveSegments[t].duration); + totalDistanceMeters += parseFloat(driveSegment.distance_meters); + totalDurationSeconds += parseFloat(driveSegment.duration); - playlistSegmentStrings += `#EXTINF:${driveSegments[t].duration},${driveSegments[t].segment_id}\n${driveUrl}/${driveSegments[t].segment_id}/qcamera.ts\n`; + playlistSegmentStrings += `#EXTINF:${driveSegment.duration},${driveSegment.segment_id}\n${driveUrl}/${driveSegment.segment_id}/qcamera.ts\n`; } - } + }); } else { logger.warn('updateDrives driveSegments not found', key); } @@ -439,7 +469,9 @@ async function updateDrives() { } catch (exception) { logger.error(exception); } - if (metadata == null) metadata = {}; + if (metadata == null) { + metadata = {}; + } if (affectedDriveInitData[key] && !metadata.InitData) { metadata.InitData = affectedDriveInitData[key]; @@ -448,7 +480,7 @@ async function updateDrives() { metadata.CarParams = affectedDriveCarParams[key]; } - logger.info(`updateDrives drive ${dongleId} ${driveIdentifier} uploadComplete: ${uploadComplete}`); + logger.info(`updateDrives drive ${dongleId} ${identifier} uploadComplete: ${uploadComplete}`); await Drives.update( {