2021-05-15 20:34:03 -06:00
const config = require ( './config' ) ;
const fs = require ( 'fs' ) ;
const path = require ( 'path' ) ;
const crypto = require ( 'crypto' ) ;
const log4js = require ( 'log4js' ) ;
2021-05-17 21:31:02 -06:00
import sqlite3 from 'sqlite3'
import { open } from 'sqlite'
2021-05-15 20:34:03 -06:00
const lockfile = require ( 'proper-lockfile' ) ;
var http = require ( 'http' ) ;
var https = require ( 'https' ) ;
const express = require ( 'express' ) ;
const cors = require ( 'cors' ) ;
const bodyParser = require ( 'body-parser' ) ;
const cookieParser = require ( 'cookie-parser' ) ;
const jwt = require ( 'jsonwebtoken' ) ;
const sendmail = require ( 'sendmail' ) ( ) ;
const htmlspecialchars = require ( 'htmlspecialchars' ) ;
const dirTree = require ( "directory-tree" ) ;
const { resolve } = require ( 'path' ) ;
2021-05-16 19:53:25 -06:00
const execSync = require ( 'child_process' ) . execSync ;
2021-05-15 20:34:03 -06:00
const Reader = require ( '@commaai/log_reader' ) ;
var ffprobe = require ( 'ffprobe' ) ,
ffprobeStatic = require ( 'ffprobe-static' ) ;
const { exception } = require ( 'console' ) ;
2021-05-17 21:31:02 -06:00
var db = null ;
2021-05-15 20:34:03 -06:00
2021-05-16 19:53:25 -06:00
var lastCleaningTime = 0 ;
var startTime = Date . now ( ) ;
2021-05-15 20:34:03 -06:00
log4js . configure ( {
appenders : { logfile : { type : "file" , filename : "worker.log" } , out : { type : "console" } } ,
categories : { default : { appenders : [ 'out' , 'logfile' ] , level : 'info' } }
} ) ;
var logger = log4js . getLogger ( 'default' ) ;
function initializeStorage ( ) {
var verifiedPath = mkDirByPathSync ( config . storagePath , { isRelativeToScript : ( config . storagePath . indexOf ( "/" ) === 0 ? false : true ) } ) ;
if ( verifiedPath != null )
logger . info ( "Verified storage path " + verifiedPath ) ;
else {
logger . error ( "Unable to verify storage path '" + config . storagePath + "', check filesystem / permissions" ) ;
process . exit ( ) ;
}
}
function validateJWTToken ( token , publicKey ) {
try {
var decoded = jwt . verify ( token . replace ( "JWT " , "" ) , publicKey , { algorithms : [ 'RS256' ] } ) ;
return decoded ;
} catch ( exception ) {
console . log ( exception ) ;
}
return null ;
}
function formatDate ( timestampMs ) {
return new Date ( timestampMs ) . toISOString ( ) . replace ( /T/ , ' ' ) . replace ( /\..+/ , '' ) ;
}
function formatDuration ( durationSeconds ) {
var secs = durationSeconds % 60 ;
var mins = Math . floor ( durationSeconds / 60 ) ;
var hours = Math . floor ( mins / 60 ) ;
mins = mins % 60 ;
var response = '' ;
if ( hours > 0 ) response += hours + 'h ' ;
if ( hours > 0 || mins > 0 ) response += mins + 'm ' ;
response += secs + 's' ;
return response ;
}
function mkDirByPathSync ( targetDir , { isRelativeToScript = false } = { } ) {
const sep = path . sep ;
const initDir = path . isAbsolute ( targetDir ) ? sep : '' ;
const baseDir = isRelativeToScript ? _ _dirname : '.' ;
return targetDir . split ( sep ) . reduce ( ( parentDir , childDir ) => {
const curDir = path . resolve ( baseDir , parentDir , childDir ) ;
try {
fs . mkdirSync ( curDir ) ;
} catch ( err ) {
if ( err . code === 'EEXIST' ) { // curDir already exists!
return curDir ;
}
// To avoid `EISDIR` error on Mac and `EACCES`-->`ENOENT` and `EPERM` on Windows.
if ( err . code === 'ENOENT' ) { // Throw the original parentDir error on curDir `ENOENT` failure.
logger . error ( ` EACCES: permission denied, mkdir ' ${ parentDir } ' ` ) ;
return null ;
}
const caughtErr = [ 'EACCES' , 'EPERM' , 'EISDIR' ] . indexOf ( err . code ) > - 1 ;
if ( ! caughtErr || ( caughtErr && curDir === path . resolve ( targetDir ) ) ) {
logger . error ( "'EACCES', 'EPERM', 'EISDIR' during mkdir" ) ;
return null ;
}
}
return curDir ;
} , initDir ) ;
}
function simpleStringify ( object ) {
var simpleObject = { } ;
for ( var prop in object ) {
if ( ! object . hasOwnProperty ( prop ) ) {
continue ;
}
if ( typeof ( object [ prop ] ) == 'object' ) {
continue ;
}
if ( typeof ( object [ prop ] ) == 'function' ) {
continue ;
}
simpleObject [ prop ] = object [ prop ] ;
}
return JSON . stringify ( simpleObject ) ; // returns cleaned up JSON
2021-05-19 15:55:19 -06:00
}
2021-05-15 20:34:03 -06:00
function writeFileSync ( path , buffer , permission ) {
var fileDescriptor ;
try {
fileDescriptor = fs . openSync ( path , 'w' , permission ) ;
} catch ( e ) {
fs . chmodSync ( path , permission ) ;
fileDescriptor = fs . openSync ( path , 'w' , permission ) ;
}
if ( fileDescriptor ) {
fs . writeSync ( fileDescriptor , buffer , 0 , buffer . length , 0 ) ;
fs . closeSync ( fileDescriptor ) ;
logger . info ( "writeFileSync wiriting to '" + path + "' successful" ) ;
return true ;
}
logger . error ( "writeFileSync writing to '" + path + "' failed" ) ;
return false ;
}
function moveUploadedFile ( buffer , directory , filename ) {
logger . info ( "moveUploadedFile called with '" + filename + "' -> '" + directory + "'" ) ;
if ( directory . indexOf ( ".." ) >= 0 || filename . indexOf ( ".." ) >= 0 ) {
logger . error ( "moveUploadedFile failed, .. in directory or filename" ) ;
return false ;
}
if ( config . storagePath . lastIndexOf ( "/" ) !== config . storagePath . length - 1 )
directory = '/' + directory ;
if ( directory . lastIndexOf ( "/" ) !== directory . length - 1 )
directory = directory + '/' ;
var finalPath = mkDirByPathSync ( config . storagePath + directory , { isRelativeToScript : ( config . storagePath . indexOf ( "/" ) === 0 ? false : true ) } ) ;
if ( finalPath && finalPath . length > 0 ) {
if ( writeFileSync ( finalPath + "/" + filename , buffer , 0o660 ) ) {
logger . info ( "moveUploadedFile successfully written '" + ( finalPath + "/" + filename ) + "'" ) ;
return finalPath + "/" + filename ;
}
logger . error ( "moveUploadedFile failed to writeFileSync" ) ;
return false ;
}
logger . error ( "moveUploadedFile invalid final path, check permissions to create / write '" + ( config . storagePath + directory ) + "'" ) ;
return false ;
2021-05-19 15:55:19 -06:00
}
2021-05-15 20:34:03 -06:00
2021-05-16 19:53:25 -06:00
function deleteFolderRecursive ( directoryPath ) {
if ( fs . existsSync ( directoryPath ) ) {
fs . readdirSync ( directoryPath ) . forEach ( ( file , index ) => {
const curPath = path . join ( directoryPath , file ) ;
if ( fs . lstatSync ( curPath ) . isDirectory ( ) ) {
deleteFolderRecursive ( curPath ) ;
} else {
fs . unlinkSync ( curPath ) ;
}
} ) ;
fs . rmdirSync ( directoryPath ) ;
}
2021-05-19 15:55:19 -06:00
}
2021-05-15 20:34:03 -06:00
var segmentProcessQueue = [ ] ;
var segmentProcessPosition = 0 ;
var affectedDrives = { } ;
2021-05-16 19:53:25 -06:00
var affectedDevices = { } ;
2021-05-15 20:34:03 -06:00
2021-05-19 10:28:39 -06:00
var rlog _lastTsInternal = 0 ;
var rlog _prevLatInternal = - 1000 ;
var rlog _prevLngInternal = - 1000 ;
var rlog _totalDistInternal = 0 ;
var rlog _lastTsExternal = 0 ;
var rlog _prevLatExternal = - 1000 ;
var rlog _prevLngExternal = - 1000 ;
var rlog _totalDistExternal = 0 ;
2021-05-15 20:34:03 -06:00
var qcamera _duration = 0 ;
function processSegmentRLog ( rLogPath ) {
2021-05-19 10:28:39 -06:00
rlog _lastTsInternal = 0 ;
rlog _prevLatInternal = - 1000 ;
rlog _prevLngInternal = - 1000 ;
rlog _totalDistInternal = 0 ;
rlog _lastTsExternal = 0 ;
rlog _prevLatExternal = - 1000 ;
rlog _prevLngExternal = - 1000 ;
rlog _totalDistExternal = 0 ;
2021-05-15 20:34:03 -06:00
return new Promise (
function ( resolve , reject ) {
var readStream = fs . createReadStream ( rLogPath ) ;
var reader = Reader ( readStream ) ;
readStream . on ( 'close' , function ( ) {
resolve ( ) ;
} ) ;
reader ( function ( obj ) {
try {
2021-05-19 10:28:39 -06:00
if ( obj [ 'LogMonoTime' ] !== undefined && obj [ 'LogMonoTime' ] - rlog _lastTsInternal >= 1000000 * 1000 * 1 && obj [ 'GpsLocation' ] !== undefined ) {
2021-05-15 20:34:03 -06:00
logger . info ( 'processSegmentRLog GpsLocation @ ' + obj [ 'LogMonoTime' ] + ': ' + obj [ 'GpsLocation' ] [ 'Latitude' ] + ' ' + obj [ 'GpsLocation' ] [ 'Longitude' ] ) ;
2021-05-19 10:28:39 -06:00
if ( rlog _prevLatInternal != - 1000 ) {
var lat1 = rlog _prevLatInternal ;
2021-05-15 20:34:03 -06:00
var lat2 = obj [ 'GpsLocation' ] [ 'Latitude' ] ;
2021-05-19 10:28:39 -06:00
var lon1 = rlog _prevLngInternal ;
2021-05-15 20:34:03 -06:00
var lon2 = obj [ 'GpsLocation' ] [ 'Longitude' ] ;
var p = 0.017453292519943295 ; // Math.PI / 180
var c = Math . cos ;
var a = 0.5 - c ( ( lat2 - lat1 ) * p ) / 2 +
c ( lat1 * p ) * c ( lat2 * p ) *
( 1 - c ( ( lon2 - lon1 ) * p ) ) / 2 ;
var dist _m = 1000 * 12742 * Math . asin ( Math . sqrt ( a ) ) ; // 2 * R; R = 6371 km
2021-05-19 19:50:53 -06:00
if ( dist _m > 4200 ) dist _m = 0 ; // each segment is max. 60s. if the calculated speed would exceed 250km/h for this segment, we assume the coordinates off / defective and skip it
2021-05-19 10:28:39 -06:00
rlog _totalDistInternal += dist _m ;
2021-05-15 20:34:03 -06:00
}
2021-05-19 10:28:39 -06:00
rlog _prevLatInternal = obj [ 'GpsLocation' ] [ 'Latitude' ] ;
rlog _prevLngInternal = obj [ 'GpsLocation' ] [ 'Longitude' ] ;
rlog _lastTsInternal = obj [ 'LogMonoTime' ] ;
}
else if ( obj [ 'LogMonoTime' ] !== undefined && obj [ 'LogMonoTime' ] - rlog _lastTsExternal >= 1000000 * 1000 * 1 && obj [ 'GpsLocationExternal' ] !== undefined ) {
logger . info ( 'processSegmentRLog GpsLocationExternal @ ' + obj [ 'LogMonoTime' ] + ': ' + obj [ 'GpsLocationExternal' ] [ 'Latitude' ] + ' ' + obj [ 'GpsLocationExternal' ] [ 'Longitude' ] ) ;
if ( rlog _prevLatExternal != - 1000 ) {
var lat1 = rlog _prevLatExternal ;
var lat2 = obj [ 'GpsLocationExternal' ] [ 'Latitude' ] ;
var lon1 = rlog _prevLngExternal ;
var lon2 = obj [ 'GpsLocationExternal' ] [ 'Longitude' ] ;
var p = 0.017453292519943295 ; // Math.PI / 180
var c = Math . cos ;
var a = 0.5 - c ( ( lat2 - lat1 ) * p ) / 2 +
c ( lat1 * p ) * c ( lat2 * p ) *
( 1 - c ( ( lon2 - lon1 ) * p ) ) / 2 ;
var dist _m = 1000 * 12742 * Math . asin ( Math . sqrt ( a ) ) ; // 2 * R; R = 6371 km
2021-05-19 19:50:53 -06:00
if ( dist _m > 4200 ) dist _m = 0 ; // each segment is max. 60s. if the calculated speed would exceed 250km/h for this segment, we assume the coordinates off / defective and skip it
2021-05-19 10:28:39 -06:00
rlog _totalDistExternal += dist _m ;
}
rlog _prevLatExternal = obj [ 'GpsLocationExternal' ] [ 'Latitude' ] ;
rlog _prevLngExternal = obj [ 'GpsLocationExternal' ] [ 'Longitude' ] ;
rlog _lastTsExternal = obj [ 'LogMonoTime' ] ;
}
2021-05-15 20:34:03 -06:00
} catch ( exception ) {
}
} ) ;
}
) ;
}
function processSegmentVideo ( qcameraPath ) {
qcamera _duration = 0 ;
return new Promise ( function ( resolve , reject ) {
ffprobe ( qcameraPath , { path : ffprobeStatic . path } )
. then ( function ( info ) {
if ( info [ 'streams' ] !== undefined && info [ 'streams' ] [ 0 ] !== undefined && info [ 'streams' ] [ 0 ] [ 'duration' ] !== undefined )
qcamera _duration = info [ 'streams' ] [ 0 ] [ 'duration' ] ;
logger . info ( 'processSegmentVideo duration: ' + qcamera _duration + 's' ) ;
resolve ( ) ;
} )
. catch ( function ( err ) {
console . error ( err ) ;
logger . error ( 'processSegmentVideo error: ' + err ) ;
resolve ( ) ;
} ) ;
} ) ;
}
function processSegmentsRecursive ( ) {
if ( segmentProcessQueue . length <= segmentProcessPosition )
return updateDrives ( ) ;
var segmentWrapper = segmentProcessQueue [ segmentProcessPosition ] ;
2021-05-17 21:31:02 -06:00
const segment = segmentWrapper . segment ;
2021-05-15 20:34:03 -06:00
const uploadComplete = segmentWrapper . uploadComplete ;
const driveIdentifier = segmentWrapper . driveIdentifier ;
const fileStatus = segmentWrapper . fileStatus ;
2021-05-17 21:31:02 -06:00
logger . info ( 'processSegmentsRecursive ' + segment . dongle _id + ' ' + segment . drive _identifier + ' ' + segment . segment _id ) ;
2021-05-15 20:34:03 -06:00
var p1 = processSegmentRLog ( fileStatus [ 'rlog.bz2' ] ) ;
var p2 = processSegmentVideo ( fileStatus [ 'qcamera.ts' ] ) ;
Promise . all ( [ p1 , p2 ] ) . then ( ( values ) => {
2021-05-17 21:31:02 -06:00
( async ( ) => {
2021-05-19 10:28:39 -06:00
logger . info ( 'processSegmentsRecursive ' + segment . dongle _id + ' ' + segment . drive _identifier + ' ' + segment . segment _id + ' internal gps: ' + ( Math . round ( rlog _totalDistInternal * 100 ) / 100 ) + 'm, external gps: ' + ( Math . round ( rlog _totalDistExternal * 100 ) / 100 ) + 'm, duration: ' + qcamera _duration + 's' ) ;
2021-05-17 21:31:02 -06:00
const driveSegmentResult = await db . run (
'UPDATE drive_segments SET duration = ?, distance_meters = ?, is_processed = ?, upload_complete = ?, is_stalled = ? WHERE id = ?' ,
2021-05-19 10:28:39 -06:00
qcamera _duration , Math . round ( Math . max ( rlog _totalDistInternal , rlog _totalDistExternal ) * 10 ) / 10 , true , uploadComplete , false ,
2021-05-17 21:31:02 -06:00
segment . id
) ;
affectedDrives [ driveIdentifier ] = true ;
segmentProcessPosition ++ ;
setTimeout ( function ( ) { processSegmentsRecursive ( ) ; } , 0 ) ;
} ) ( ) ;
} ) . catch ( ( error ) => {
logger . error ( error ) ;
} ) ;
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
async function updateSegments ( ) {
2021-05-15 20:34:03 -06:00
segmentProcessQueue = [ ] ;
segmentProcessPosition = 0 ;
affectedDrives = { } ;
2021-05-17 21:31:02 -06:00
const drive _segments = await db . all ( 'SELECT * FROM drive_segments WHERE upload_complete = ? AND is_stalled = ? ORDER BY created ASC' , false , false ) ;
2021-05-15 20:34:03 -06:00
for ( var t = 0 ; t < drive _segments . length ; t ++ ) {
var segment = drive _segments [ t ] ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( segment . dongle _id ) . digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( segment . drive _identifier ) . digest ( 'hex' ) ;
const directoryTree = dirTree ( config . storagePath + segment . dongle _id + "/" + dongleIdHash + "/" + driveIdentifierHash + "/" + segment . drive _identifier + "/" + segment . segment _id ) ;
2021-05-19 10:28:39 -06:00
if ( directoryTree == null || directoryTree . children == undefined ) continue ; // happens if upload in progress (db entity written but directory not yet created)
2021-05-15 20:34:03 -06:00
var qcamera = false ;
var fcamera = false ;
var dcamera = false ;
var qlog = false ;
var rlog = false ;
var fileStatus = { 'fcamera.hevc' : false , 'dcamera.hevc' : false , 'qcamera.ts' : false , 'qlog.bz2' : false , 'rlog.bz2' : false } ;
for ( var i in directoryTree . children ) {
fileStatus [ directoryTree . children [ i ] . name ] = directoryTree . children [ i ] . path ;
}
var uploadComplete = false ;
if ( fileStatus [ 'qcamera.ts' ] !== false && fileStatus [ 'fcamera.hevc' ] !== false && fileStatus [ 'rlog.bz2' ] !== false && fileStatus [ 'qlog.bz2' ] !== false ) // upload complete
uploadComplete = true ;
if ( fileStatus [ 'qcamera.ts' ] !== false && fileStatus [ 'rlog.bz2' ] !== false && ! segment . is _processed ) { // can process
segmentProcessQueue . push ( { segment : segment , fileStatus : fileStatus , uploadComplete : uploadComplete , driveIdentifier : segment . dongle _id + "|" + segment . drive _identifier } ) ;
}
else if ( uploadComplete ) {
logger . info ( 'updateSegments uploadComplete for ' + segment . dongle _id + ' ' + segment . drive _identifier + ' ' + segment . segment _id ) ;
2021-05-17 21:31:02 -06:00
const driveSegmentResult = await db . run (
'UPDATE drive_segments SET upload_complete = ?, is_stalled = ? WHERE id = ?' ,
true , false , segment . id ) ;
2021-05-15 20:34:03 -06:00
affectedDrives [ segment . dongle _id + "|" + segment . drive _identifier ] = true ;
}
else if ( Date . now ( ) - segment . created > 10 * 24 * 3600 * 1000 ) { // ignore non-uploaded segments after 10 days until a new upload_url is requested (which resets is_stalled)
logger . info ( 'updateSegments isStalled for ' + segment . dongle _id + ' ' + segment . drive _identifier + ' ' + segment . segment _id ) ;
2021-05-17 21:31:02 -06:00
const driveSegmentResult = await db . run (
'UPDATE drive_segments SET is_stalled = ? WHERE id = ?' ,
true , segment . id ) ;
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
if ( segmentProcessQueue . length >= 50 ) // we process at most 50 segments per batch
2021-05-15 20:34:03 -06:00
break ;
}
if ( segmentProcessQueue . length > 0 )
processSegmentsRecursive ( ) ;
else // if no data is to be collected, call updateDrives to update those where eventually just the last segment completed the upload
updateDrives ( ) ;
}
2021-05-17 21:31:02 -06:00
async function updateDevices ( ) {
2021-05-16 19:53:25 -06:00
// go through all affected devices (with deleted or updated drives) and update them (storage_used)
logger . info ( "updateDevices - affected drives: " + JSON . stringify ( affectedDevices ) ) ;
for ( const [ key , value ] of Object . entries ( affectedDevices ) ) {
var dongleId = key ;
2021-05-17 21:31:02 -06:00
const device = await db . get ( 'SELECT * FROM devices WHERE dongle_id = ?' , dongleId ) ;
if ( device == null ) continue ;
2021-05-16 19:53:25 -06:00
2021-05-17 21:31:02 -06:00
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( device . dongle _id ) . digest ( 'hex' ) ;
var devicePath = config . storagePath + device . dongle _id + "/" + dongleIdHash ;
2021-05-16 19:53:25 -06:00
var deviceQuotaMb = Math . round ( parseInt ( execSync ( "du -s " + devicePath + " | awk -F'\t' '{print $1;}'" ) . toString ( ) ) / 1024 ) ;
logger . info ( "updateDevices device " + dongleId + " has an updated storage_used of: " + deviceQuotaMb + " MB" ) ;
2021-05-17 21:31:02 -06:00
const deviceResult = await db . run (
'UPDATE devices SET storage_used = ? WHERE dongle_id = ?' ,
deviceQuotaMb , device . dongle _id ) ;
2021-05-16 19:53:25 -06:00
}
affectedDevices = [ ] ;
}
2021-05-15 20:34:03 -06:00
2021-05-17 21:31:02 -06:00
async function updateDrives ( ) {
2021-05-15 20:34:03 -06:00
// go through all affected drives and update them / complete and/or build m3u8
logger . info ( "updateDrives - affected drives: " + JSON . stringify ( affectedDrives ) ) ;
for ( const [ key , value ] of Object . entries ( affectedDrives ) ) {
2021-05-18 23:10:17 -06:00
var dongleId , driveIdentifier ;
2021-05-15 20:34:03 -06:00
[ dongleId , driveIdentifier ] = key . split ( '|' ) ;
2021-05-17 21:31:02 -06:00
const drive = await db . get ( 'SELECT * FROM drives WHERE identifier = ? AND dongle_id = ?' , driveIdentifier , dongleId ) ;
if ( drive == null ) continue ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( drive . dongle _id ) . digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( drive . identifier ) . digest ( 'hex' ) ;
var driveUrl = config . baseDriveDownloadUrl + drive . dongle _id + "/" + dongleIdHash + "/" + driveIdentifierHash + "/" + drive . identifier ;
var drivePath = config . storagePath + drive . dongle _id + "/" + dongleIdHash + "/" + driveIdentifierHash + "/" + drive . identifier ;
2021-05-15 20:34:03 -06:00
var uploadComplete = true ;
var isProcessed = true ;
var totalDistanceMeters = 0 ;
var totalDurationSeconds = 0 ;
var playlistSegmentStrings = '' ;
2021-05-17 21:31:02 -06:00
const drive _segments = await db . all ( 'SELECT * FROM drive_segments WHERE drive_identifier = ? AND dongle_id = ? ORDER BY segment_id ASC' , driveIdentifier , dongleId ) ;
2021-05-15 20:34:03 -06:00
for ( var t = 0 ; t < drive _segments . length ; t ++ ) {
if ( ! drive _segments [ t ] . upload _complete ) uploadComplete = false ;
if ( ! drive _segments [ t ] . is _processed ) isProcessed = false ;
else {
totalDistanceMeters += parseFloat ( drive _segments [ t ] . distance _meters ) ;
totalDurationSeconds += parseFloat ( drive _segments [ t ] . duration ) ;
playlistSegmentStrings += ` #EXTINF: ` + drive _segments [ t ] . duration + ` , ` + drive _segments [ t ] . segment _id + ` \n ` +
driveUrl + ` / ` + drive _segments [ t ] . segment _id + ` /qcamera.ts \n ` ;
}
}
2021-05-17 21:31:02 -06:00
var filesize = drive . filesize ;
2021-05-15 20:34:03 -06:00
if ( uploadComplete ) {
try {
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( dongleId ) . digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( driveIdentifier ) . digest ( 'hex' ) ;
2021-05-17 21:31:02 -06:00
filesize = parseInt ( execSync ( "du -s " + drivePath + " | awk -F'\t' '{print $1;}'" ) . toString ( ) ) ; // in kilobytes
2021-05-15 20:34:03 -06:00
}
catch ( exception ) { }
}
2021-05-17 21:31:02 -06:00
logger . info ( "updateDrives drive " + dongleId + " " + driveIdentifier + " uploadComplete: " + uploadComplete ) ;
const driveResult = await db . run (
'UPDATE drives SET distance_meters = ?, duration = ?, upload_complete = ?, is_processed = ?, filesize = ? WHERE id = ?' ,
Math . round ( totalDistanceMeters ) , totalDurationSeconds , uploadComplete , isProcessed , filesize , drive . id ) ;
2021-05-15 20:34:03 -06:00
2021-05-16 19:53:25 -06:00
affectedDevices [ dongleId ] = true ;
2021-05-15 20:34:03 -06:00
if ( isProcessed ) {
// create the playlist file m3u8 for cabana
var playlist = ` #EXTM3U \n ` +
` #EXT-X-VERSION:3 \n ` +
` #EXT-X-TARGETDURATION:61 \n ` +
` #EXT-X-MEDIA-SEQUENCE:0 \n ` +
` #EXT-X-PLAYLIST-TYPE:VOD \n ` +
playlistSegmentStrings + ` \n ` +
` #EXT-X-ENDLIST ` ;
fs . writeFileSync ( drivePath + '/qcamera.m3u8' , playlist ) ;
}
}
2021-05-16 19:53:25 -06:00
updateDevices ( ) ;
2021-05-15 20:34:03 -06:00
setTimeout ( function ( ) { mainWorkerLoop ( ) ; } , 0 ) ;
}
2021-05-17 21:31:02 -06:00
async function deleteExpiredDrives ( ) {
2021-05-16 19:53:25 -06:00
var expirationTs = Date . now ( ) - config . deviceDriveExpirationDays * 24 * 3600 * 1000 ;
2021-05-17 21:31:02 -06:00
const expiredDrives = await db . all ( 'SELECT * FROM drives WHERE is_preserved = ? AND is_deleted = ? AND created < ?' , false , false , expirationTs ) ;
2021-05-16 19:53:25 -06:00
for ( var t = 0 ; t < expiredDrives . length ; t ++ ) {
logger . info ( "deleteExpiredDrives drive " + expiredDrives [ t ] . dongle _id + " " + expiredDrives [ t ] . identifier + " is older than " + config . deviceDriveExpirationDays + " days, set is_deleted=true" ) ;
2021-05-17 21:31:02 -06:00
const driveResult = await db . run (
'UPDATE drives SET is_deleted = ? WHERE id = ?' ,
true , expiredDrives [ t ] . id ) ;
2021-05-16 19:53:25 -06:00
}
2021-05-15 20:34:03 -06:00
}
2021-05-17 21:31:02 -06:00
async function removeDeletedDrivesPhysically ( ) {
const deletedDrives = await db . all ( 'SELECT * FROM drives WHERE is_deleted = ? AND is_physically_removed = ?' , true , false ) ;
for ( var t = 0 ; t < deletedDrives . length ; t ++ ) {
logger . info ( "removeDeletedDrivesPhysically drive " + deletedDrives [ t ] . dongle _id + " " + deletedDrives [ t ] . identifier + " is deleted, remove physical files and clean database" ) ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( deletedDrives [ t ] . dongle _id ) . digest ( 'hex' ) ;
var driveIdentifierHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( deletedDrives [ t ] . identifier ) . digest ( 'hex' ) ;
2021-05-16 19:53:25 -06:00
2021-05-17 21:31:02 -06:00
const drivePath = config . storagePath + deletedDrives [ t ] . dongle _id + "/" + dongleIdHash + "/" + driveIdentifierHash + "" ;
logger . info ( "removeDeletedDrivesPhysically drive " + deletedDrives [ t ] . dongle _id + " " + deletedDrives [ t ] . identifier + " storage path is " + drivePath ) ;
2021-05-16 19:53:25 -06:00
try {
deleteFolderRecursive ( drivePath , { recursive : true } ) ;
2021-05-17 21:31:02 -06:00
const driveResult = await db . run (
'UPDATE drives SET is_physically_removed = ? WHERE id = ?' ,
true , deletedDrives [ t ] . id ) ;
const driveSegmentResult = await db . run (
'DELETE FROM drive_segments WHERE drive_identifier = ? AND dongle_id = ?' ,
deletedDrives [ t ] . identifier , deletedDrives [ t ] . dongle _id ) ;
affectedDevices [ deletedDrives [ t ] . dongle _id ] = true ;
2021-05-16 19:53:25 -06:00
} catch ( exception ) {
logger . error ( exception ) ;
}
}
}
2021-05-17 21:31:02 -06:00
async function deleteOverQuotaDrives ( ) {
const devices = await db . all ( 'SELECT * FROM devices WHERE storage_used > ?' , config . deviceStorageQuotaMb ) ;
2021-05-16 19:53:25 -06:00
for ( var t = 0 ; t < devices . length ; t ++ ) {
2021-05-17 21:31:02 -06:00
var foundDriveToDelete = false ;
const driveNormal = await db . get ( 'SELECT * FROM drives WHERE dongle_id = ? AND is_preserved = ? AND is_deleted = ? ORDER BY created ASC LIMIT 1' , devices [ t ] . dongle _id , false , false ) ;
if ( driveNormal != null ) {
logger . info ( "deleteOverQuotaDrives drive " + driveNormal . dongle _id + " " + driveNormal . identifier + " (normal) is deleted for over-quota" ) ;
const driveResult = await db . run (
'UPDATE drives SET is_deleted = ? WHERE id = ?' ,
true , driveNormal . id ) ;
foundDriveToDelete = true ;
}
2021-05-16 19:53:25 -06:00
2021-05-17 21:31:02 -06:00
if ( ! foundDriveToDelete ) {
const drivePreserved = await db . get ( 'SELECT * FROM drives WHERE dongle_id = ? AND is_preserved = ? AND is_deleted = ? ORDER BY created ASC LIMIT 1' , devices [ t ] . dongle _id , true , false ) ;
if ( drivePreserved != null ) {
logger . info ( "deleteOverQuotaDrives drive " + drivePreserved . dongle _id + " " + drivePreserved . identifier + " (preserved!) is deleted for over-quota" ) ;
const driveResult = await db . run (
'UPDATE drives SET is_deleted = ? WHERE id = ?' ,
true , drivePreserved . id ) ;
foundDriveToDelete = true ;
2021-05-16 19:53:25 -06:00
}
}
2021-05-17 21:31:02 -06:00
2021-05-16 19:53:25 -06:00
}
}
2021-05-17 21:31:02 -06:00
async function deleteBootAndCrashLogs ( ) {
const devices = await db . all ( 'SELECT * FROM devices' ) ;
2021-05-16 19:53:25 -06:00
for ( var t = 0 ; t < devices . length ; t ++ ) {
var device = devices [ t ] ;
var dongleIdHash = crypto . createHmac ( 'sha256' , config . applicationSalt ) . update ( device . dongle _id ) . digest ( 'hex' ) ;
const bootlogDirectoryTree = dirTree ( config . storagePath + device . dongle _id + "/" + dongleIdHash + "/boot/" , { attributes : [ 'size' ] } ) ;
var bootlogFiles = [ ] ;
if ( bootlogDirectoryTree != undefined ) {
for ( var i = 0 ; i < bootlogDirectoryTree . children . length ; i ++ ) {
var timeSplit = bootlogDirectoryTree . children [ i ] . name . replace ( 'boot-' , '' ) . replace ( 'crash-' , '' ) . replace ( '\.bz2' , '' ) . split ( '--' ) ;
2021-05-17 21:31:02 -06:00
var timeString = timeSplit [ 0 ] + ' ' + timeSplit [ 1 ] . replace ( /-/g , ':' ) ;
2021-05-16 19:53:25 -06:00
bootlogFiles . push ( { 'name' : bootlogDirectoryTree . children [ i ] . name , 'size' : bootlogDirectoryTree . children [ i ] . size , 'date' : Date . parse ( timeString ) , 'path' : bootlogDirectoryTree . children [ i ] . path } ) ;
}
bootlogFiles . sort ( ( a , b ) => ( a . date < b . date ) ? 1 : - 1 ) ;
for ( var c = 5 ; c < bootlogFiles . length ; c ++ ) {
logger . info ( "deleteBootAndCrashLogs deleting boot log " + bootlogFiles [ c ] [ 'path' ] + "" ) ;
try {
fs . unlinkSync ( bootlogFiles [ c ] [ 'path' ] ) ;
affectedDevices [ device . dongle _id ] = true ;
} catch ( exception ) {
logger . error ( exception ) ;
}
}
}
const crashlogDirectoryTree = dirTree ( config . storagePath + device . dongle _id + "/" + dongleIdHash + "/crash/" , { attributes : [ 'size' ] } ) ;
var crashlogFiles = [ ] ;
if ( crashlogDirectoryTree != undefined ) {
for ( var i = 0 ; i < crashlogDirectoryTree . children . length ; i ++ ) {
var timeSplit = crashlogDirectoryTree . children [ i ] . name . replace ( 'boot-' , '' ) . replace ( 'crash-' , '' ) . replace ( '\.bz2' , '' ) . split ( '--' ) ;
2021-05-17 21:31:02 -06:00
var timeString = timeSplit [ 0 ] + ' ' + timeSplit [ 1 ] . replace ( /-/g , ':' ) ;
2021-05-16 19:53:25 -06:00
crashlogFiles . push ( { 'name' : crashlogDirectoryTree . children [ i ] . name , 'size' : crashlogDirectoryTree . children [ i ] . size , 'date' : Date . parse ( timeString ) } ) ;
}
crashlogFiles . sort ( ( a , b ) => ( a . date < b . date ) ? 1 : - 1 ) ;
for ( var c = 5 ; c < crashlogFiles . length ; c ++ ) {
logger . info ( "deleteBootAndCrashLogs deleting crash log " + crashlogFiles [ c ] [ 'path' ] + "" ) ;
try {
fs . unlinkSync ( crashlogFiles [ c ] [ 'path' ] ) ;
affectedDevices [ device . dongle _id ] = true ;
} catch ( exception ) {
logger . error ( exception ) ;
}
}
}
}
2021-05-15 20:34:03 -06:00
}
function mainWorkerLoop ( ) {
2021-05-19 19:50:53 -06:00
if ( Date . now ( ) - startTime > 60 * 60 * 1000 ) {
2021-05-15 20:34:03 -06:00
logger . info ( "EXIT WORKER AFTER 1 HOUR TO PREVENT MEMORY LEAKS..." ) ;
process . exit ( ) ;
}
2021-05-17 21:31:02 -06:00
try {
2021-05-19 19:50:53 -06:00
if ( Date . now ( ) - lastCleaningTime > 20 * 60 * 1000 ) {
2021-05-17 21:31:02 -06:00
deleteBootAndCrashLogs ( ) ;
deleteExpiredDrives ( ) ;
deleteOverQuotaDrives ( ) ;
removeDeletedDrivesPhysically ( ) ;
lastCleaningTime = Date . now ( ) ;
}
setTimeout ( function ( ) { updateSegments ( ) ; } , 5000 ) ;
} catch ( e ) {
logger . error ( e ) ;
2021-05-16 19:53:25 -06:00
}
2021-05-15 20:34:03 -06:00
2021-05-16 19:53:25 -06:00
}
2021-05-15 20:34:03 -06:00
2021-05-19 10:28:39 -06:00
lockfile . lock ( 'retropilot_worker.lock' , { realpath : false , stale : 90000 , update : 2000 } )
2021-05-15 20:34:03 -06:00
. then ( ( release ) => {
logger . info ( "STARTING WORKER..." ) ;
2021-05-17 21:31:02 -06:00
( async ( ) => {
try {
db = await open ( {
filename : config . databaseFile ,
driver : sqlite3 . Database ,
mode : sqlite3 . OPEN _READWRITE
} ) ;
await db . get ( 'SELECT * FROM accounts LIMIT 1' )
await db . get ( 'SELECT * FROM devices LIMIT 1' )
await db . get ( 'SELECT * FROM drives LIMIT 1' )
await db . get ( 'SELECT * FROM drive_segments LIMIT 1' )
} catch ( exception ) {
logger . error ( exception ) ;
process . exit ( ) ;
}
initializeStorage ( ) ;
setTimeout ( function ( ) { mainWorkerLoop ( ) ; } , 0 ) ;
} ) ( ) ;
2021-05-15 20:34:03 -06:00
} ) . catch ( ( e ) => {
console . error ( e )
process . exit ( ) ;
} ) ;