diff --git a/src/workers/dbc-csv-downloader.worker.js b/src/workers/dbc-csv-downloader.worker.js index 042f717..f743fed 100644 --- a/src/workers/dbc-csv-downloader.worker.js +++ b/src/workers/dbc-csv-downloader.worker.js @@ -5,6 +5,8 @@ import NumpyLoader from "../utils/loadnpy"; import * as CanApi from "../api/can"; +const MAX_CONNECTIONS = 8; + var window = self; const Int64LE = require("int64-buffer").Int64LE; @@ -16,30 +18,51 @@ async function fetchAndPostData( [minPart, maxPart], canStartTime ) { - console.log("\n\nfetchAndPostData", `${currentPart} of ${maxPart}`); + console.log("starting fetchAndPostData process"); - // if we've exhausted the parts, close up shop - if (currentPart > maxPart) { - self.postMessage({ - progress: 100, - shouldClose: true - }); - self.close(); - return; - } - - let awaitedData = null; - try { - awaitedData = await CanApi.fetchCanPart(base, currentPart); - } catch (e) { - console.log("fetchCanPart missing part", e); - return fetchAndPostData( + var partList = []; + var minPart = currentPart; + var prevPart = Promise.resolve(); + var promiseBuffer = []; + var totalParts = maxPart - minPart; + var currentProcess = 0; + while (currentPart <= maxPart) { + // post inc, pass the previous value then inc + console.log("Starting download", currentPart - minPart, "of", totalParts); + prevPart = downloadNextPart( base, - currentPart + 1, - [minPart, maxPart], - canStartTime + canStartTime, + currentProcess, + prevPart, + currentPart++ ); + currentProcess = ~~(100 * ((currentPart - minPart) / totalParts)); + + promiseBuffer.push(prevPart); + if (promiseBuffer.length > MAX_CONNECTIONS) { + await promiseBuffer.shift(); + } } + + await prevPart; + // processing is done! + self.postMessage({ + progress: 100, + shouldClose: true + }); + self.close(); +} + +async function downloadNextPart( + base, + canStartTime, + currentProcess, + prevPartPromise, + currentPart +) { + var awaitedData = await CanApi.fetchCanPart(base, currentPart); + await prevPartPromise; + const { times, sources, addresses, datas } = awaitedData; // times is a float64array, which we want to be a normal array for now @@ -62,12 +85,12 @@ async function fetchAndPostData( console.log("posting message"); self.postMessage({ - progress: 10, + progress: currentProcess, logData, shouldClose: false }); - fetchAndPostData(base, currentPart + 1, [minPart, maxPart], canStartTime); + return awaitedData; } function transformAndSend(rawData) {