diff --git a/src/worker/worker.js b/src/worker/worker.js index b7a899a..d68ac55 100644 --- a/src/worker/worker.js +++ b/src/worker/worker.js @@ -210,7 +210,7 @@ async function processSegmentsRecursive() { logger.error(`FAILING TO PROCESS SEGMENT,${dongleId} ${driveIdentifier} ${segmentId} JSON: ${JSON.stringify(segment)} SKIPPING `); segmentProcessPosition += 1; } else { - Promise.all([ + await Promise.all([ processSegmentRLog(fileStatus['rlog.bz2']), processSegmentVideo(fileStatus['qcamera.ts']), ]) @@ -269,24 +269,24 @@ async function updateSegments() { }); logger.info('updateSegments - total segments', segments.length); - if (segments.length > 0) { + await Promise.all(segments.map(async (segment) => { // we process at most 15 segments per batch - segments.length = Math.min(segments.length, 15); - logger.debug(`updateSegments - processing ${segments.length} segments`); + if (segmentProcessQueue.length >= 15) { + return; + } - await Promise.all(segments.map(async (segment) => { - logger.debug('updateSegments - segment', segment); - const { - id, - created, + const { + id, + created, dongle_id: dongleId, drive_identifier: driveIdentifier, - is_processed: isProcessed, - segment_id: segmentId, - } = segment; + is_processed: isProcessed, + segment_id: segmentId, + } = segment; + logger.debug('updateSegments - segment', driveIdentifier, segmentId); - const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) - .update(dongleId) + const dongleIdHash = crypto.createHmac('sha256', process.env.APP_SALT) + .update(dongleId) .digest('hex'); const driveIdentifierHash = crypto.createHmac('sha256', process.env.APP_SALT) .update(driveIdentifier) @@ -306,27 +306,27 @@ async function updateSegments() { dcamera: 'dcamera.hevc', qcamera: 'qcamera.ts', qlog: 'qlog.bz2', - rlog: 'rlog.bz2', - }; - const fileStatus = { - [SegmentFiles.fcamera]: false, - [SegmentFiles.dcamera]: false, - [SegmentFiles.qcamera]: false, - [SegmentFiles.qlog]: false, - [SegmentFiles.rlog]: false, - }; + rlog: 'rlog.bz2', + }; + const fileStatus = { + [SegmentFiles.fcamera]: undefined, + [SegmentFiles.dcamera]: undefined, + [SegmentFiles.qcamera]: undefined, + [SegmentFiles.qlog]: undefined, + [SegmentFiles.rlog]: undefined, + }; - directoryTree.children.forEach((file) => { - if (file.name in fileStatus) { - logger.debug('updateSegments - found file', file.name); - fileStatus[file.name] = true; - } - }); + directoryTree.children.forEach((file) => { + if (file.name in fileStatus) { + logger.debug('updateSegments - found file', file.name); + fileStatus[file.name] = file.path; + } + }); - const uploadComplete = Object.keys(fileStatus).every((key) => fileStatus[key]); - logger.debug('updateSegments - uploadComplete', uploadComplete); + const uploadComplete = Object.keys(fileStatus).every((key) => !!fileStatus[key]); + logger.debug('updateSegments - uploadComplete', uploadComplete); - if (fileStatus[SegmentFiles.qcamera] && fileStatus[SegmentFiles.rlog] && !isProcessed) { + if (fileStatus[SegmentFiles.qcamera] && fileStatus[SegmentFiles.rlog] && !isProcessed) { // can process logger.debug('updateSegments - can process', id); segmentProcessQueue.push({ @@ -350,10 +350,9 @@ async function updateSegments() { await DriveSegments.update({ is_stalled: true, - }, { where: { id } }); - } - })); - } + }, { where: { id } }); + } + })); if (segmentProcessQueue.length > 0) { logger.info('updateSegments - processing', segmentProcessQueue.length);