2022-01-08 14:36:33 -07:00
/* eslint-disable */
2021-05-15 20:34:03 -06:00
const config = require ( './config' ) ;
const fs = require ( 'fs' ) ;
const path = require ( 'path' ) ;
const crypto = require ( 'crypto' ) ;
2022-01-08 14:36:33 -07:00
const log4js = require ( 'log4js' ) ;
2021-05-15 20:34:03 -06:00
2022-01-08 14:36:33 -07:00
const sqlite3 = require ( 'sqlite3' ) ;
const { open } = require ( 'sqlite' ) ;
2021-05-17 21:31:02 -06:00
2021-05-15 20:34:03 -06:00
const lockfile = require ( 'proper-lockfile' ) ;
const jwt = require ( 'jsonwebtoken' ) ;
2022-01-08 14:36:33 -07:00
const dirTree = require ( 'directory-tree' ) ;
const { execSync } = require ( 'child_process' ) ;
2021-05-15 20:34:03 -06:00
const Reader = require ( '@commaai/log_reader' ) ;
var ffprobe = require ( 'ffprobe' ) ,
2022-01-08 15:19:08 -07:00
ffprobeStatic = require ( 'ffprobe-static' ) ;
2021-05-15 20:34:03 -06:00
2022-01-08 14:36:33 -07:00
var db = null ;
2021-05-16 19:53:25 -06:00
2022-01-08 14:36:33 -07:00
var lastCleaningTime = 0 ;
var startTime = Date . now ( ) ;
2021-05-15 20:34:03 -06:00
log4js . configure ( {
2022-01-08 14:36:33 -07:00
appenders : { logfile : { type : 'file' , filename : 'worker.log' } , out : { type : 'console' } } ,
categories : { default : { appenders : [ 'out' , 'logfile' ] , level : 'info' } }
2021-05-15 20:34:03 -06:00
} ) ;
2022-01-08 14:36:33 -07:00
var logger = log4js . getLogger ( 'default' ) ;
2021-05-15 20:34:03 -06:00
function initializeStorage ( ) {
2022-01-08 15:19:08 -07:00
var verifiedPath = mkDirByPathSync ( config . storagePath , { isRelativeToScript : ( config . storagePath . indexOf ( '/' ) !== 0 ) } ) ;
if ( verifiedPath != null ) {
logger . info ( ` Verified storage path ${ verifiedPath } ` ) ;
}
else {
logger . error ( ` Unable to verify storage path ' ${ config . storagePath } ', check filesystem / permissions ` ) ;
process . exit ( ) ;
}
2021-05-15 20:34:03 -06:00
}
function mkDirByPathSync ( targetDir , { isRelativeToScript = false } = { } ) {
2022-01-08 15:19:08 -07:00
const { sep } = path ;
const initDir = path . isAbsolute ( targetDir ) ? sep : '' ;
const baseDir = isRelativeToScript ? _ _dirname : '.' ;
return targetDir . split ( sep )
. reduce ( ( parentDir , childDir ) => {
const curDir = path . resolve ( baseDir , parentDir , childDir ) ;
try {
fs . mkdirSync ( curDir ) ;
} catch ( err ) {
if ( err . code === 'EEXIST' ) { // curDir already exists!
return curDir ;
2022-01-08 14:36:33 -07:00
}
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
// To avoid `EISDIR` error on Mac and `EACCES`-->`ENOENT` and `EPERM` on Windows.
if ( err . code === 'ENOENT' ) { // Throw the original parentDir error on curDir `ENOENT` failure.
logger . error ( ` EACCES: permission denied, mkdir ' ${ parentDir } ' ` ) ;
return null ;
}
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
const caughtErr = [ 'EACCES' , 'EPERM' , 'EISDIR' ] . indexOf ( err . code ) > - 1 ;
if ( ! caughtErr || ( caughtErr && curDir === path . resolve ( targetDir ) ) ) {
logger . error ( '\'EACCES\', \'EPERM\', \'EISDIR\' during mkdir' ) ;
return null ;
2022-01-08 14:36:33 -07:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
return curDir ;
2022-01-08 14:36:33 -07:00
} , initDir ) ;
2021-05-15 20:34:03 -06:00
}
function writeFileSync ( path , buffer , permission ) {
2022-01-08 15:19:08 -07:00
var fileDescriptor ;
try {
fileDescriptor = fs . openSync ( path , 'w' , permission ) ;
} catch ( e ) {
fs . chmodSync ( path , permission ) ;
fileDescriptor = fs . openSync ( path , 'w' , permission ) ;
}
if ( fileDescriptor ) {
fs . writeSync ( fileDescriptor , buffer , 0 , buffer . length , 0 ) ;
fs . closeSync ( fileDescriptor ) ;
logger . info ( ` writeFileSync wiriting to ' ${ path } ' successful ` ) ;
return true ;
}
logger . error ( ` writeFileSync writing to ' ${ path } ' failed ` ) ;
return false ;
2021-05-15 20:34:03 -06:00
}
2022-01-08 15:19:08 -07:00
// eslint-disable-next-line
2021-05-15 20:34:03 -06:00
function moveUploadedFile ( buffer , directory , filename ) {
2022-01-08 15:19:08 -07:00
logger . info ( ` moveUploadedFile called with ' ${ filename } ' -> ' ${ directory } ' ` ) ;
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
if ( directory . indexOf ( '..' ) >= 0 || filename . indexOf ( '..' ) >= 0 ) {
logger . error ( 'moveUploadedFile failed, .. in directory or filename' ) ;
return false ;
}
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
if ( config . storagePath . lastIndexOf ( '/' ) !== config . storagePath . length - 1 ) {
directory = ` / ${ directory } ` ;
}
if ( directory . lastIndexOf ( '/' ) !== directory . length - 1 ) directory += '/' ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
var finalPath = mkDirByPathSync ( config . storagePath + directory , { isRelativeToScript : ( config . storagePath . indexOf ( '/' ) !== 0 ) } ) ;
if ( finalPath && finalPath . length > 0 ) {
if ( writeFileSync ( ` ${ finalPath } / ${ filename } ` , buffer , 0o660 ) ) {
logger . info ( ` moveUploadedFile successfully written ' ${ finalPath } / ${ filename } ' ` ) ;
return ` ${ finalPath } / ${ filename } ` ;
2021-05-15 20:34:03 -06:00
}
2022-01-08 15:19:08 -07:00
logger . error ( 'moveUploadedFile failed to writeFileSync' ) ;
2022-01-08 14:36:33 -07:00
return false ;
2022-01-08 15:19:08 -07:00
}
logger . error ( ` moveUploadedFile invalid final path, check permissions to create / write ' ${ config . storagePath + directory } ' ` ) ;
return false ;
2021-05-19 15:55:19 -06:00
}
2021-05-15 20:34:03 -06:00
2021-05-16 19:53:25 -06:00
function deleteFolderRecursive ( directoryPath ) {
2022-01-08 15:19:08 -07:00
if ( fs . existsSync ( directoryPath ) ) {
fs . readdirSync ( directoryPath )
. forEach ( ( file , index ) => {
const curPath = path . join ( directoryPath , file ) ;
if ( fs . lstatSync ( curPath )
. isDirectory ( ) ) {
deleteFolderRecursive ( curPath ) ;
}
else {
fs . unlinkSync ( curPath ) ;
}
} ) ;
fs . rmdirSync ( directoryPath ) ;
}
2021-05-19 15:55:19 -06:00
}
2021-05-15 20:34:03 -06:00
2021-06-03 21:03:50 -06:00
async function dbProtectedRun ( ) {
2022-01-08 15:19:08 -07:00
let retries = 0 ;
while ( true ) {
try {
return await db . run ( ... arguments ) ;
} catch ( error ) {
logger . error ( error ) ;
retries ++ ;
if ( retries >= 10 ) {
2021-06-03 21:03:50 -06:00
break ;
2022-01-08 15:19:08 -07:00
}
await new Promise ( ( r ) => setTimeout ( r , 1000 ) ) ;
2021-06-03 21:03:50 -06:00
}
2022-01-08 15:19:08 -07:00
}
logger . error ( ` unable to complete dbProtectedRun for ${ arguments } ` ) ;
return null ;
2021-06-03 21:03:50 -06:00
}
async function dbProtectedGet ( ) {
2022-01-08 15:19:08 -07:00
let retries = 0 ;
while ( true ) {
try {
return await db . get ( ... arguments ) ;
} catch ( error ) {
logger . error ( error ) ;
retries ++ ;
if ( retries >= 10 ) {
2021-06-03 21:03:50 -06:00
break ;
2022-01-08 15:19:08 -07:00
}
await new Promise ( ( r ) => setTimeout ( r , 1000 ) ) ;
2021-06-03 21:03:50 -06:00
}
2022-01-08 15:19:08 -07:00
}
logger . error ( ` unable to complete dbProtectedGet for ${ arguments } ` ) ;
return null ;
2021-06-03 21:03:50 -06:00
}
async function dbProtectedAll ( ) {
2022-01-08 15:19:08 -07:00
let retries = 0 ;
while ( true ) {
try {
return await db . all ( ... arguments ) ;
} catch ( error ) {
logger . error ( error ) ;
retries ++ ;
if ( retries >= 10 ) {
2021-06-03 21:03:50 -06:00
break ;
2022-01-08 15:19:08 -07:00
}
await new Promise ( ( r ) => setTimeout ( r , 1000 ) ) ;
2021-06-03 21:03:50 -06:00
}
2022-01-08 15:19:08 -07:00
}
logger . error ( ` unable to complete dbProtectedGet for ${ arguments } ` ) ;
return null ;
2021-06-03 21:03:50 -06:00
}
2022-01-08 14:36:33 -07:00
var segmentProcessQueue = [ ] ;
var segmentProcessPosition = 0 ;
2021-05-29 19:06:27 -06:00
2022-01-08 14:36:33 -07:00
var affectedDrives = { } ;
var affectedDriveInitData = { } ;
var affectedDriveCarParams = { } ;
2021-05-15 20:34:03 -06:00
2022-01-08 14:36:33 -07:00
var affectedDevices = { } ;
2021-05-21 19:43:51 -06:00
2022-01-08 14:36:33 -07:00
var rlog _lastTsInternal = 0 ;
var rlog _prevLatInternal = - 1000 ;
var rlog _prevLngInternal = - 1000 ;
2021-05-19 10:28:39 -06:00
var rlog _totalDistInternal = 0 ;
2022-01-08 14:36:33 -07:00
var rlog _lastTsExternal = 0 ;
var rlog _prevLatExternal = - 1000 ;
var rlog _prevLngExternal = - 1000 ;
2021-05-19 10:28:39 -06:00
var rlog _totalDistExternal = 0 ;
2021-05-15 20:34:03 -06:00
var qcamera _duration = 0 ;
function processSegmentRLog ( rLogPath ) {
2022-01-08 15:19:08 -07:00
rlog _lastTsInternal = 0 ;
rlog _prevLatInternal = - 1000 ;
rlog _prevLngInternal = - 1000 ;
rlog _totalDistInternal = 0 ;
rlog _lastTsExternal = 0 ;
rlog _prevLatExternal = - 1000 ;
rlog _prevLngExternal = - 1000 ;
rlog _totalDistExternal = 0 ;
rlog _CarParams = null ;
rlog _InitData = null ;
return new Promise (
( resolve , reject ) => {
var temporaryFile = rLogPath . replace ( '.bz2' , '' ) ;
try {
execSync ( ` bunzip2 -k -f " ${ rLogPath } " ` ) ;
} catch ( exception ) { // if bunzip2 fails, something was wrong with the file (corrupt / missing)
logger . error ( exception ) ;
try {
fs . unlinkSync ( temporaryFile ) ;
} catch ( exception ) { }
resolve ( ) ;
return ;
}
let readStream ;
let reader ;
try {
readStream = fs . createReadStream ( temporaryFile ) ;
reader = Reader ( readStream ) ;
} catch ( err ) {
logger . error ( '314 - logger' , err ) ;
}
readStream . on ( 'close' , ( ) => {
logger . info ( 'processSegmentRLog readStream close event triggered, resolving promise' ) ;
try {
fs . unlinkSync ( temporaryFile ) ;
} catch ( exception ) { }
resolve ( ) ;
} ) ;
try {
reader ( ( obj ) => {
try {
if ( obj . LogMonoTime !== undefined && obj . LogMonoTime - rlog _lastTsInternal >= 1000000 * 1000 * 0.99 && obj . GpsLocation !== undefined ) {
logger . info ( ` processSegmentRLog GpsLocation @ ${ obj . LogMonoTime } : ${ obj . GpsLocation . Latitude } ${ obj . GpsLocation . Longitude } ` ) ;
if ( rlog _prevLatInternal != - 1000 ) {
var lat1 = rlog _prevLatInternal ;
var lat2 = obj . GpsLocation . Latitude ;
var lon1 = rlog _prevLngInternal ;
var lon2 = obj . GpsLocation . Longitude ;
var p = 0.017453292519943295 ; // Math.PI / 180
var c = Math . cos ;
var a = 0.5 - c ( ( lat2 - lat1 ) * p ) / 2
+ c ( lat1 * p ) * c ( lat2 * p )
* ( 1 - c ( ( lon2 - lon1 ) * p ) ) / 2 ;
var dist _m = 1000 * 12742 * Math . asin ( Math . sqrt ( a ) ) ; // 2 * R; R = 6371 km
if ( dist _m > 70 ) dist _m = 0 ; // each segment is max. 60s. if the calculated speed would exceed ~250km/h for this segment, we assume the coordinates off / defective and skip it
rlog _totalDistInternal += dist _m ;
}
rlog _prevLatInternal = obj . GpsLocation . Latitude ;
rlog _prevLngInternal = obj . GpsLocation . Longitude ;
rlog _lastTsInternal = obj . LogMonoTime ;
2022-01-08 14:36:33 -07:00
}
2022-01-08 15:19:08 -07:00
else if ( obj . LogMonoTime !== undefined && obj . LogMonoTime - rlog _lastTsExternal >= 1000000 * 1000 * 0.99 && obj . GpsLocationExternal !== undefined ) {
logger . info ( ` processSegmentRLog GpsLocationExternal @ ${ obj . LogMonoTime } : ${ obj . GpsLocationExternal . Latitude } ${ obj . GpsLocationExternal . Longitude } ` ) ;
if ( rlog _prevLatExternal != - 1000 ) {
var lat1 = rlog _prevLatExternal ;
var lat2 = obj . GpsLocationExternal . Latitude ;
var lon1 = rlog _prevLngExternal ;
var lon2 = obj . GpsLocationExternal . Longitude ;
var p = 0.017453292519943295 ; // Math.PI / 180
var c = Math . cos ;
var a = 0.5 - c ( ( lat2 - lat1 ) * p ) / 2
+ c ( lat1 * p ) * c ( lat2 * p )
* ( 1 - c ( ( lon2 - lon1 ) * p ) ) / 2 ;
var dist _m = 1000 * 12742 * Math . asin ( Math . sqrt ( a ) ) ; // 2 * R; R = 6371 km
if ( dist _m > 70 ) dist _m = 0 ; // each segment is max. 60s. if the calculated speed would exceed ~250km/h for this segment, we assume the coordinates off / defective and skip it
rlog _totalDistExternal += dist _m ;
}
rlog _prevLatExternal = obj . GpsLocationExternal . Latitude ;
rlog _prevLngExternal = obj . GpsLocationExternal . Longitude ;
rlog _lastTsExternal = obj . LogMonoTime ;
2022-01-08 14:36:33 -07:00
}
2022-01-08 15:19:08 -07:00
else if ( obj . LogMonoTime !== undefined && obj . CarParams !== undefined && rlog _CarParams == null ) {
rlog _CarParams = obj . CarParams ;
2022-01-08 14:36:33 -07:00
}
2022-01-08 15:19:08 -07:00
else if ( obj . LogMonoTime !== undefined && obj . InitData !== undefined && rlog _InitData == null ) {
rlog _InitData = obj . InitData ;
2021-05-15 20:34:03 -06:00
}
2022-01-08 15:19:08 -07:00
} catch ( exception ) {
}
} ) ;
} catch ( readerERr ) {
throw new Error ( 'reader Err 385' , readerEEr ) ;
}
}
) ;
2021-05-15 20:34:03 -06:00
}
function processSegmentVideo ( qcameraPath ) {
2022-01-08 15:19:08 -07:00
qcamera _duration = 0 ;
return new Promise ( ( resolve , reject ) => {
ffprobe ( qcameraPath , { path : ffprobeStatic . path } )
. then ( ( info ) => {
if ( info . streams !== undefined && info . streams [ 0 ] !== undefined && info . streams [ 0 ] . duration !== undefined ) {
qcamera _duration = info . streams [ 0 ] . duration ;
}
logger . info ( ` processSegmentVideo duration: ${ qcamera _duration } s ` ) ;
resolve ( ) ;
} )
. catch ( ( err ) => {
console . error ( err ) ;
logger . error ( ` processSegmentVideo error: ${ err } ` ) ;
resolve ( ) ;
} ) ;
} ) ;
2021-05-15 20:34:03 -06:00
}
function processSegmentsRecursive ( ) {
2022-01-08 15:19:08 -07:00
if ( segmentProcessQueue . length <= segmentProcessPosition ) {
return updateDrives ( ) ;
}
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
var segmentWrapper = segmentProcessQueue [ segmentProcessPosition ] ;
const { segment } = segmentWrapper ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
const { uploadComplete } = segmentWrapper ;
const { driveIdentifier } = segmentWrapper ;
const { fileStatus } = segmentWrapper ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
logger . info ( ` processSegmentsRecursive ${ segment . dongle _id } ${ segment . drive _identifier } ${ segment . segment _id } ${ JSON . stringify ( segment ) } ` ) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
const driveSegmentResult = dbProtectedRun (
'UPDATE drive_segments SET process_attempts = ? WHERE id = ?' ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
segment . process _attempts = segment . process _attempts + 1 ,
segment . id
) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
if ( segment . process _attempts > 5 ) {
logger . error ( ` FAILING TO PROCESS SEGMENT, ${ segment . dongle _id } ${ segment . drive _identifier } ${ segment . segment _id } JSON: ${ JSON . stringify ( segment ) } SKIPPING ` ) ;
segmentProcessPosition ++ ;
}
else {
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
var p1 = processSegmentRLog ( fileStatus [ 'rlog.bz2' ] ) ;
var p2 = processSegmentVideo ( fileStatus [ 'qcamera.ts' ] ) ;
Promise . all ( [ p1 , p2 ] )
. then ( ( values ) => {
( async ( ) => {
logger . info ( ` processSegmentsRecursive ${ segment . dongle _id } ${ segment . drive _identifier } ${ segment . segment _id } internal gps: ${ Math . round ( rlog _totalDistInternal * 100 ) / 100 } m, external gps: ${ Math . round ( rlog _totalDistExternal * 100 ) / 100 } m, duration: ${ qcamera _duration } s ` ) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
const driveSegmentResult = await dbProtectedRun (
'UPDATE drive_segments SET duration = ?, distance_meters = ?, is_processed = ?, upload_complete = ?, is_stalled = ? WHERE id = ?' ,
qcamera _duration ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
Math . round ( Math . max ( rlog _totalDistInternal , rlog _totalDistExternal ) * 10 ) / 10 ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
true ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
uploadComplete ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
false ,
segment . id
) ;
if ( driveSegmentResult === null ) // if the update failed, stop right here with segment processing and try to update the drives at least
{
segmentProcessPosition = segmentProcessQueue . length ;
}
affectedDrives [ driveIdentifier ] = true ;
if ( rlog _CarParams != null ) {
affectedDriveCarParams [ driveIdentifier ] = rlog _CarParams ;
}
if ( rlog _InitData != null ) {
affectedDriveInitData [ driveIdentifier ] = rlog _InitData ;
}
segmentProcessPosition ++ ;
setTimeout ( ( ) => {
processSegmentsRecursive ( ) ;
} , 0 ) ;
} ) ( ) ;
} )
. catch ( ( error ) => {
logger . error ( error ) ;
} ) ;
}
2021-06-03 21:03:50 -06:00
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
async function updateSegments ( ) {
2022-01-08 15:19:08 -07:00
segmentProcessQueue = [ ] ;
segmentProcessPosition = 0 ;
affectedDrives = { } ;
affectedDriveCarParams = { } ;
affectedDriveInitData = { } ;
const drive _segments = await dbProtectedAll ( 'SELECT * FROM drive_segments WHERE upload_complete = ? AND is_stalled = ? ORDER BY created ASC' , false , false ) ;
if ( drive _segments != null ) {
for ( var t = 0 ; t < drive _segments . length ; t ++ ) {
var segment = drive _segments [ t ] ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( segment . dongle _id )
. digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( segment . drive _identifier )
. digest ( 'hex' ) ;
const directoryTree = dirTree ( ` ${ config . storagePath + segment . dongle _id } / ${ dongleIdHash } / ${ driveIdentifierHash } / ${ segment . drive _identifier } / ${ segment . segment _id } ` ) ;
if ( directoryTree == null || directoryTree . children == undefined ) continue ; // happens if upload in progress (db entity written but directory not yet created)
var qcamera = false ;
var fcamera = false ;
var dcamera = false ;
var qlog = false ;
var rlog = false ;
var fileStatus = {
'fcamera.hevc' : false ,
'dcamera.hevc' : false ,
'qcamera.ts' : false ,
'qlog.bz2' : false ,
'rlog.bz2' : false
} ;
for ( var i in directoryTree . children ) {
fileStatus [ directoryTree . children [ i ] . name ] = directoryTree . children [ i ] . path ;
}
var uploadComplete = false ;
if ( fileStatus [ 'qcamera.ts' ] !== false && fileStatus [ 'fcamera.hevc' ] !== false && fileStatus [ 'rlog.bz2' ] !== false && fileStatus [ 'qlog.bz2' ] !== false ) // upload complete
{
uploadComplete = true ;
}
if ( fileStatus [ 'qcamera.ts' ] !== false && fileStatus [ 'rlog.bz2' ] !== false && ! segment . is _processed ) { // can process
segmentProcessQueue . push ( {
segment ,
fileStatus ,
uploadComplete ,
driveIdentifier : ` ${ segment . dongle _id } | ${ segment . drive _identifier } `
} ) ;
}
else if ( uploadComplete ) {
logger . info ( ` updateSegments uploadComplete for ${ segment . dongle _id } ${ segment . drive _identifier } ${ segment . segment _id } ` ) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
const driveSegmentResult = await dbProtectedRun (
'UPDATE drive_segments SET upload_complete = ?, is_stalled = ? WHERE id = ?' ,
true ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
false ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
segment . id
) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
affectedDrives [ ` ${ segment . dongle _id } | ${ segment . drive _identifier } ` ] = true ;
}
else if ( Date . now ( ) - segment . created > 10 * 24 * 3600 * 1000 ) { // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled)
logger . info ( ` updateSegments isStalled for ${ segment . dongle _id } ${ segment . drive _identifier } ${ segment . segment _id } ` ) ;
2021-05-17 21:31:02 -06:00
2022-01-08 15:19:08 -07:00
const driveSegmentResult = await dbProtectedRun (
'UPDATE drive_segments SET is_stalled = ? WHERE id = ?' ,
true ,
2021-05-17 21:31:02 -06:00
2022-01-08 15:19:08 -07:00
segment . id
) ;
}
2021-05-17 21:31:02 -06:00
2022-01-08 15:19:08 -07:00
if ( segmentProcessQueue . length >= 15 ) // we process at most 15 segments per batch
{
break ;
}
2021-05-15 20:34:03 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
if ( segmentProcessQueue . length > 0 ) {
processSegmentsRecursive ( ) ;
}
else // if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload
{
updateDrives ( ) ;
}
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
async function updateDevices ( ) {
2022-01-08 15:19:08 -07:00
// go through all affected devices (with deleted or updated drives) and update them (storage_used)
logger . info ( ` updateDevices - affected drives: ${ JSON . stringify ( affectedDevices ) } ` ) ;
for ( const [ key , value ] of Object . entries ( affectedDevices ) ) {
var dongleId = key ;
const device = await dbProtectedGet ( 'SELECT * FROM devices WHERE dongle_id = ?' , dongleId ) ;
if ( device == null ) continue ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( device . dongle _id )
. digest ( 'hex' ) ;
var devicePath = ` ${ config . storagePath + device . dongle _id } / ${ dongleIdHash } ` ;
var deviceQuotaMb = Math . round ( parseInt ( execSync ( ` du -s ${ devicePath } | awk -F' \t ' '{print $ 1;}' ` )
. toString ( ) ) / 1024 ) ;
logger . info ( ` updateDevices device ${ dongleId } has an updated storage_used of: ${ deviceQuotaMb } MB ` ) ;
const deviceResult = await dbProtectedRun (
'UPDATE devices SET storage_used = ? WHERE dongle_id = ?' ,
deviceQuotaMb ,
device . dongle _id
) ;
}
affectedDevices = [ ] ;
2021-05-16 19:53:25 -06:00
}
2021-05-15 20:34:03 -06:00
2021-05-17 21:31:02 -06:00
async function updateDrives ( ) {
2022-01-08 15:19:08 -07:00
// go through all affected drives and update them / complete and/or build m3u8
logger . info ( ` updateDrives - affected drives: ${ JSON . stringify ( affectedDrives ) } ` ) ;
for ( const [ key , value ] of Object . entries ( affectedDrives ) ) {
var dongleId ,
driveIdentifier ;
[ dongleId , driveIdentifier ] = key . split ( '|' ) ;
const drive = await dbProtectedGet ( 'SELECT * FROM drives WHERE identifier = ? AND dongle_id = ?' , driveIdentifier , dongleId ) ;
if ( drive == null ) continue ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( drive . dongle _id )
. digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( drive . identifier )
. digest ( 'hex' ) ;
var driveUrl = ` ${ config . baseDriveDownloadUrl + drive . dongle _id } / ${ dongleIdHash } / ${ driveIdentifierHash } / ${ drive . identifier } ` ;
var drivePath = ` ${ config . storagePath + drive . dongle _id } / ${ dongleIdHash } / ${ driveIdentifierHash } / ${ drive . identifier } ` ;
var uploadComplete = true ;
var isProcessed = true ;
var totalDistanceMeters = 0 ;
var totalDurationSeconds = 0 ;
var playlistSegmentStrings = '' ;
const drive _segments = await dbProtectedAll ( 'SELECT * FROM drive_segments WHERE drive_identifier = ? AND dongle_id = ? ORDER BY segment_id ASC' , driveIdentifier , dongleId ) ;
if ( drive _segments != null ) {
for ( var t = 0 ; t < drive _segments . length ; t ++ ) {
if ( ! drive _segments [ t ] . upload _complete ) uploadComplete = false ;
if ( ! drive _segments [ t ] . is _processed ) {
isProcessed = false ;
2021-05-15 20:34:03 -06:00
}
2022-01-08 15:19:08 -07:00
else {
totalDistanceMeters += parseFloat ( drive _segments [ t ] . distance _meters ) ;
totalDurationSeconds += parseFloat ( drive _segments [ t ] . duration ) ;
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
playlistSegmentStrings += ` #EXTINF: ${ drive _segments [ t ] . duration } , ${ drive _segments [ t ] . segment _id } \n ${ driveUrl } / ${ drive _segments [ t ] . segment _id } /qcamera.ts \n ` ;
2022-01-08 14:36:33 -07:00
}
2022-01-08 15:19:08 -07:00
}
}
2021-05-29 19:06:27 -06:00
2022-01-08 15:19:08 -07:00
var { filesize } = drive ;
if ( uploadComplete ) {
try {
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( dongleId )
. digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( driveIdentifier )
. digest ( 'hex' ) ;
filesize = parseInt ( execSync ( ` du -s ${ drivePath } | awk -F' \t ' '{print $ 1;}' ` )
. toString ( ) ) ; // in kilobytes
} catch ( exception ) { }
}
2021-05-29 19:06:27 -06:00
2022-01-08 15:19:08 -07:00
let metadata = { } ;
try {
metadata = JSON . parse ( drive . metadata ) ;
} catch ( exception ) {
logger . error ( exception ) ;
}
if ( metadata == null ) metadata = { } ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
if ( affectedDriveInitData [ key ] != undefined && metadata . InitData == undefined ) {
metadata . InitData = affectedDriveInitData [ key ] ;
}
if ( affectedDriveCarParams [ key ] != undefined && metadata . CarParams == undefined ) {
metadata . CarParams = affectedDriveCarParams [ key ] ;
}
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
logger . info ( ` updateDrives drive ${ dongleId } ${ driveIdentifier } uploadComplete: ${ uploadComplete } ` ) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
const driveResult = await dbProtectedRun (
'UPDATE drives SET distance_meters = ?, duration = ?, upload_complete = ?, is_processed = ?, filesize = ?, metadata = ? WHERE id = ?' ,
Math . round ( totalDistanceMeters ) ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
totalDurationSeconds ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
uploadComplete ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
isProcessed ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
filesize ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
JSON . stringify ( metadata ) ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
drive . id
) ;
2021-05-15 20:34:03 -06:00
2022-01-08 15:19:08 -07:00
affectedDevices [ dongleId ] = true ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
if ( isProcessed ) {
// create the playlist file m3u8 for cabana
var playlist = '#EXTM3U\n'
+ '#EXT-X-VERSION:3\n'
+ '#EXT-X-TARGETDURATION:61\n'
+ '#EXT-X-MEDIA-SEQUENCE:0\n'
+ ` #EXT-X-PLAYLIST-TYPE:VOD \n ${ playlistSegmentStrings } \n `
+ '#EXT-X-ENDLIST' ;
fs . writeFileSync ( ` ${ drivePath } /qcamera.m3u8 ` , playlist ) ;
2021-05-15 20:34:03 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-16 19:53:25 -06:00
2022-01-08 15:19:08 -07:00
updateDevices ( ) ;
2021-05-16 19:53:25 -06:00
2022-01-08 15:19:08 -07:00
setTimeout ( ( ) => {
mainWorkerLoop ( ) ;
} , 0 ) ;
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
async function deleteExpiredDrives ( ) {
2022-01-08 15:19:08 -07:00
var expirationTs = Date . now ( ) - config . deviceDriveExpirationDays * 24 * 3600 * 1000 ;
const expiredDrives = await dbProtectedAll ( 'SELECT * FROM drives WHERE is_preserved = ? AND is_deleted = ? AND created < ?' , false , false , expirationTs ) ;
if ( expiredDrives != null ) {
for ( var t = 0 ; t < expiredDrives . length ; t ++ ) {
logger . info ( ` deleteExpiredDrives drive ${ expiredDrives [ t ] . dongle _id } ${ expiredDrives [ t ] . identifier } is older than ${ config . deviceDriveExpirationDays } days, set is_deleted=true ` ) ;
const driveResult = await dbProtectedRun (
'UPDATE drives SET is_deleted = ? WHERE id = ?' ,
true ,
expiredDrives [ t ] . id
) ;
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
async function removeDeletedDrivesPhysically ( ) {
2022-01-08 15:19:08 -07:00
const deletedDrives = await dbProtectedAll ( 'SELECT * FROM drives WHERE is_deleted = ? AND is_physically_removed = ?' , true , false ) ;
if ( deletedDrives == null ) {
return ;
}
for ( var t = 0 ; t < deletedDrives . length ; t ++ ) {
logger . info ( ` removeDeletedDrivesPhysically drive ${ deletedDrives [ t ] . dongle _id } ${ deletedDrives [ t ] . identifier } is deleted, remove physical files and clean database ` ) ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( deletedDrives [ t ] . dongle _id )
. digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( deletedDrives [ t ] . identifier )
. digest ( 'hex' ) ;
const drivePath = ` ${ config . storagePath + deletedDrives [ t ] . dongle _id } / ${ dongleIdHash } / ${ driveIdentifierHash } ` ;
logger . info ( ` removeDeletedDrivesPhysically drive ${ deletedDrives [ t ] . dongle _id } ${ deletedDrives [ t ] . identifier } storage path is ${ drivePath } ` ) ;
try {
const driveResult = await dbProtectedRun (
'UPDATE drives SET is_physically_removed = ? WHERE id = ?' ,
true ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
deletedDrives [ t ] . id
) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
const driveSegmentResult = await dbProtectedRun (
'DELETE FROM drive_segments WHERE drive_identifier = ? AND dongle_id = ?' ,
deletedDrives [ t ] . identifier ,
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
deletedDrives [ t ] . dongle _id
) ;
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
if ( driveResult != null && driveSegmentResult != null ) deleteFolderRecursive ( drivePath , { recursive : true } ) ;
affectedDevices [ deletedDrives [ t ] . dongle _id ] = true ;
} catch ( exception ) {
logger . error ( exception ) ;
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-16 19:53:25 -06:00
}
2021-05-17 21:31:02 -06:00
async function deleteOverQuotaDrives ( ) {
2022-01-08 15:19:08 -07:00
const devices = await dbProtectedAll ( 'SELECT * FROM devices WHERE storage_used > ?' , config . deviceStorageQuotaMb ) ;
if ( devices == null ) {
return ;
}
for ( var t = 0 ; t < devices . length ; t ++ ) {
var foundDriveToDelete = false ;
const driveNormal = await dbProtectedGet ( 'SELECT * FROM drives WHERE dongle_id = ? AND is_preserved = ? AND is_deleted = ? ORDER BY created ASC LIMIT 1' , devices [ t ] . dongle _id , false , false ) ;
if ( driveNormal != null ) {
logger . info ( ` deleteOverQuotaDrives drive ${ driveNormal . dongle _id } ${ driveNormal . identifier } (normal) is deleted for over-quota ` ) ;
const driveResult = await dbProtectedRun (
'UPDATE drives SET is_deleted = ? WHERE id = ?' ,
true ,
driveNormal . id
) ;
foundDriveToDelete = true ;
}
if ( ! foundDriveToDelete ) {
const drivePreserved = await dbProtectedGet ( 'SELECT * FROM drives WHERE dongle_id = ? AND is_preserved = ? AND is_deleted = ? ORDER BY created ASC LIMIT 1' , devices [ t ] . dongle _id , true , false ) ;
if ( drivePreserved != null ) {
logger . info ( ` deleteOverQuotaDrives drive ${ drivePreserved . dongle _id } ${ drivePreserved . identifier } (preserved!) is deleted for over-quota ` ) ;
const driveResult = await dbProtectedRun (
'UPDATE drives SET is_deleted = ? WHERE id = ?' ,
true ,
2021-05-16 19:53:25 -06:00
2022-01-08 15:19:08 -07:00
drivePreserved . id
) ;
foundDriveToDelete = true ;
}
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-16 19:53:25 -06:00
}
2021-05-17 21:31:02 -06:00
async function deleteBootAndCrashLogs ( ) {
2022-01-08 15:19:08 -07:00
const devices = await dbProtectedAll ( 'SELECT * FROM devices' ) ;
if ( devices == null ) {
return ;
}
for ( var t = 0 ; t < devices . length ; t ++ ) {
var device = devices [ t ] ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt )
. update ( device . dongle _id )
. digest ( 'hex' ) ;
const bootlogDirectoryTree = dirTree ( ` ${ config . storagePath + device . dongle _id } / ${ dongleIdHash } /boot/ ` , { attributes : [ 'size' ] } ) ;
var bootlogFiles = [ ] ;
if ( bootlogDirectoryTree != undefined ) {
for ( var i = 0 ; i < bootlogDirectoryTree . children . length ; i ++ ) {
var timeSplit = bootlogDirectoryTree . children [ i ] . name . replace ( 'boot-' , '' )
. replace ( 'crash-' , '' )
. replace ( '\.bz2' , '' )
. split ( '--' ) ;
var timeString = ` ${ timeSplit [ 0 ] } ${ timeSplit [ 1 ] . replace ( /-/g , ':' ) } ` ;
bootlogFiles . push ( {
name : bootlogDirectoryTree . children [ i ] . name ,
size : bootlogDirectoryTree . children [ i ] . size ,
date : Date . parse ( timeString ) ,
path : bootlogDirectoryTree . children [ i ] . path
} ) ;
}
bootlogFiles . sort ( ( a , b ) => ( ( a . date < b . date ) ? 1 : - 1 ) ) ;
for ( var c = 5 ; c < bootlogFiles . length ; c ++ ) {
logger . info ( ` deleteBootAndCrashLogs deleting boot log ${ bootlogFiles [ c ] . path } ` ) ;
try {
fs . unlinkSync ( bootlogFiles [ c ] . path ) ;
affectedDevices [ device . dongle _id ] = true ;
} catch ( exception ) {
logger . error ( exception ) ;
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
}
}
const crashlogDirectoryTree = dirTree ( ` ${ config . storagePath + device . dongle _id } / ${ dongleIdHash } /crash/ ` , { attributes : [ 'size' ] } ) ;
var crashlogFiles = [ ] ;
if ( crashlogDirectoryTree != undefined ) {
for ( var i = 0 ; i < crashlogDirectoryTree . children . length ; i ++ ) {
var timeSplit = crashlogDirectoryTree . children [ i ] . name . replace ( 'boot-' , '' )
. replace ( 'crash-' , '' )
. replace ( '\.bz2' , '' )
. split ( '--' ) ;
var timeString = ` ${ timeSplit [ 0 ] } ${ timeSplit [ 1 ] . replace ( /-/g , ':' ) } ` ;
crashlogFiles . push ( {
name : crashlogDirectoryTree . children [ i ] . name ,
size : crashlogDirectoryTree . children [ i ] . size ,
date : Date . parse ( timeString ) ,
path : crashlogDirectoryTree . children [ i ] . path
} ) ;
}
crashlogFiles . sort ( ( a , b ) => ( ( a . date < b . date ) ? 1 : - 1 ) ) ;
for ( var c = 5 ; c < crashlogFiles . length ; c ++ ) {
logger . info ( ` deleteBootAndCrashLogs deleting crash log ${ crashlogFiles [ c ] . path } ` ) ;
try {
fs . unlinkSync ( crashlogFiles [ c ] . path ) ;
affectedDevices [ device . dongle _id ] = true ;
} catch ( exception ) {
logger . error ( exception ) ;
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
}
2021-05-15 20:34:03 -06:00
}
2021-06-03 21:03:50 -06:00
async function mainWorkerLoop ( ) {
2022-01-08 15:19:08 -07:00
if ( Date . now ( ) - startTime > 60 * 60 * 1000 ) {
logger . info ( 'EXIT WORKER AFTER 1 HOUR TO PREVENT MEMORY LEAKS...' ) ;
process . exit ( ) ;
}
2022-01-08 14:36:33 -07:00
2022-01-08 15:19:08 -07:00
try {
if ( Date . now ( ) - lastCleaningTime > 20 * 60 * 1000 ) {
await deleteBootAndCrashLogs ( ) ;
await deleteExpiredDrives ( ) ;
await deleteOverQuotaDrives ( ) ;
await removeDeletedDrivesPhysically ( ) ;
lastCleaningTime = Date . now ( ) ;
2021-05-16 19:53:25 -06:00
}
2022-01-08 15:19:08 -07:00
setTimeout ( ( ) => {
updateSegments ( ) ;
} , 5000 ) ;
} catch ( e ) {
logger . error ( e ) ;
}
2021-05-16 19:53:25 -06:00
}
2021-05-15 20:34:03 -06:00
2021-05-21 19:43:51 -06:00
// make sure bunzip2 is available
try {
2022-01-08 15:19:08 -07:00
//execSync('bunzip2 --help');
} catch ( exception ) {
logger . error ( 'bunzip2 is not installed or not available in environment path' ) ;
process . exit ( ) ;
2021-05-21 19:43:51 -06:00
}
2021-06-03 21:03:50 -06:00
2022-01-08 15:19:08 -07:00
lockfile . lock ( 'retropilot_worker.lock' , {
realpath : false ,
stale : 90000 ,
update : 2000
} )
. then ( ( release ) => {
logger . info ( 'STARTING WORKER...' ) ;
( async ( ) => {
try {
db = await open ( {
filename : config . databaseFile ,
driver : sqlite3 . Database ,
mode : sqlite3 . OPEN _READWRITE
} ) ;
await db . get ( 'SELECT * FROM accounts LIMIT 1' ) ;
await db . get ( 'SELECT * FROM devices LIMIT 1' ) ;
await db . get ( 'SELECT * FROM drives LIMIT 1' ) ;
await db . get ( 'SELECT * FROM drive_segments LIMIT 1' ) ;
} catch ( exception ) {
logger . error ( exception ) ;
2022-01-08 14:36:33 -07:00
process . exit ( ) ;
2022-01-08 15:19:08 -07:00
}
initializeStorage ( ) ;
setTimeout ( ( ) => {
mainWorkerLoop ( ) ;
} , 0 ) ;
} ) ( ) ;
} )
. catch ( ( e ) => {
console . error ( e ) ;
process . exit ( ) ;
} ) ;