Using Node.js, I am trying to "replay" a simple csv file that contains a time point in float seconds in one column and a numeric value in the other. The idea is to output values in times specified in the first column after the script has been started.
This is the code I wrote to do that.
const NS_PER_SEC = 1e9;
const {chain} = require('stream-chain');
const {parser} = require('stream-csv-as-json');
const {streamValues} = require('stream-json/streamers/StreamValues');
const fs = require('fs');
const zlib = require('zlib');
// CSV file streaming pipeline
const pipeline = chain([
fs.createReadStream('timed-data.csv'),
parser({separator: ','}),
streamValues(),
data => {
const value = data.value;
return {t: value[0], pitch: value[1]}
}
]);
var startTime = process.hrtime();
// Output a line every second for testing
setInterval(()=>{
let t = process.hrtime(startTime)[0]
console.log('----------------' + t + '-----------------------')
}, 1000)
pipeline.on('data', (d) => {
// get time
let hrtDiff = process.hrtime(startTime);
// convert hrtime array to float sec
let hrtDiffFloat = (hrtDiff[0] + (hrtDiff[1] / NS_PER_SEC));
// calcualte time difference from now
var diff = (d.t - hrtDiffFloat)*1000;
console.log('Setting timeout to '+ diff + 'ms');
setTimeout((d)=>console.log('data: '+d.t+', '+d.pitch), diff, d);
});
Currently, thousands of executions are scheduled as Node flies through the file. Later, events start firing.
The question is: How do I schedule the events executions one (or a few) at a time so that new items are pulled from the stream after the scheduled executions have finished?
(apart from war crimes like while (Date.now() < executionTime) ;)
Note that I am quite new to Node so chances are streams are not the right tool for this task.
Example data: (real data contains hundreds of values per second)
time,val
1.886621315,0
2.757369614,186.920
3.848707482,178.005
4.440816326,0
4.992290249,154.440
5.932698412,0
7.845260770,0
9.027936507,240.235
10.164172335,264.044
11.625487528,198.861
13.526439909,249.802
14.841088435,0
15.243628117,173.235
15.847346938,198.861
17.250612244,223.481
18.521541950,218.313
20.495238095,264.044
21.796371882,348.087
22.134240362,278.755
26.769705215,249.083
Console output for example data:
Mikulass-MBP:experiments mikulas$ node strem-csv.js
Setting timeout to NaNms
Setting timeout to 1882.043799ms
Setting timeout to 2752.6014609999997ms
Setting timeout to 3843.824312ms
Setting timeout to 4435.851658ms
Setting timeout to 4987.248032ms
Setting timeout to 5927.584522ms
Setting timeout to 7840.075599000001ms
Setting timeout to 9022.677742ms
Setting timeout to 10158.839757ms
Setting timeout to 11620.070245ms
Setting timeout to 13520.945921ms
Setting timeout to 14835.519640999999ms
Setting timeout to 15237.984088ms
Setting timeout to 15841.62652ms
Setting timeout to 17244.816256ms
Setting timeout to 18515.670276999997ms
Setting timeout to 20489.291373ms
Setting timeout to 21790.351713ms
Setting timeout to 22128.146255ms
Setting timeout to 26763.537879999996ms
data: time, val
----------------1-----------------------
data: 1.886621315, 0
----------------2-----------------------
data: 2.757369614, 186.920
----------------3-----------------------
data: 3.848707482, 178.005
----------------4-----------------------
data: 4.440816326, 0
data: 4.992290249, 154.440
----------------5-----------------------
data: 5.932698412, 0
----------------6-----------------------
----------------7-----------------------
data: 7.845260770, 0
----------------8-----------------------
data: 9.027936507, 240.235
----------------9-----------------------
----------------10-----------------------
data: 10.164172335, 264.044
----------------11-----------------------
data: 11.625487528, 198.861
----------------12-----------------------
----------------13-----------------------
data: 13.526439909, 249.802
----------------14-----------------------
data: 14.841088435, 0
----------------15-----------------------
data: 15.243628117, 173.235
data: 15.847346938, 198.861
----------------16-----------------------
----------------17-----------------------
data: 17.250612244, 223.481
----------------18-----------------------
data: 18.521541950, 218.313
----------------19-----------------------
----------------20-----------------------
data: 20.495238095, 264.044
----------------21-----------------------
data: 21.796371882, 348.087
----------------22-----------------------
data: 22.134240362, 278.755
----------------23-----------------------
----------------24-----------------------
----------------25-----------------------
----------------26-----------------------
data: 26.769705215, 249.083
----------------27-----------------------
----------------28-----------------------
You could add an async function to the chain:
const timer = ms => new Promise(res => setTimeout(res, ms));
const pipeline = chain([
fs.createReadStream('timed-data.csv'),
parser({separator: ','}),
streamValues(),
async data => {
const [time, pitch] = data.value;
await timer(time * 1000); // "sleep"
return {time, pitch };
}
]);
that does backpressure, which is actually the most elegant option as you would've to load all the data into a buffer otherwise.