1

I need to parse a CSV document from Node.JS, performing database operations for each record (= each line). However, I'm having trouble finding a suitable CSV parser using a pull approach, or at least a push approach that waits for my record operations before parsing the next row.

I've looked at csv-parse, csvtojson, csv-streamify, but they all seem to push events in a continuous stream without any flow control. If parsing a 1000 line CSV document, I basically get all 1000 callbacks in quick sequence. For each record, I perform an operation returning a promise. Currently I've had to resort to pushing all my promises into an array and after getting the done/end event I also wait for Promise.all(myOperations) to know when the document has been fully processed. But this is not very nice, and also, I'd prefer parsing one line at a time and fully processing it, before getting the next record, instead of concurrently processing all records - it's hard to debug and uses a lot of memory as opposed to simply dealing with each record sequentially.

So, is there a CSV parser that supports pull mode, or a way to get any stream-based CSV parser (preferably csvtojson as that's the one I'm using at the moment) to only produce events for new records when my handler for the previous record is finished (using promises)?

1
  • 1
    Take a look at scramjet and the StreangStream.CSVParse method. You'll need to set the maxParallel option there to make sure you're running just one operation side by side but it'll get your job done. Oh - and it does use promises and/or async functions into account natively. Commented Apr 25, 2018 at 20:53

2 Answers 2

0

I solved this myself by creating my own Writable and piping the CSV parser to it. My write method does its stuff and wraps a promise to the node callback passed to _write() (here implemented using Q.nodeify):

class CsvConsumer extends stream.Writable {
    _write(data, encoding, cb) {
        console.log('Got data: ', data);

        Q.delay(1000).then(() => {
            console.log('Waited 1 s');
        }).nodeify(cb);
    }
}

csvtojson()
    .fromStream(is)
    .pipe(new CsvConsumer())
    .on('finish', err => {
        if (err) {
            console.log('Error!');
        } else {
            console.log('Done!');
        }
    });

This will process lines one by one:

Got data: {"a": "1"}
Waited 1 s
Got data: {"a": "2"}
Waited 1 s
Got data: {"a": "3"}
Waited 1 s
Done!
Sign up to request clarification or add additional context in comments.

Comments

0

If you want to process each line asynchronously you can do that with node's native LineReader.

const lineStream = readline.createInterface({
  input: fs.createReadStream('data/test.csv'),
});

lineStream.on('line', (eachLine) =>{
    //process each line
});

If you want to do the same in synchronous fashion you can use line-by-line. It doesn't buffer the entire file into memory. It provides event handlers to pause and resume the 'line' emit event.

lr.on('line', function (line) {
    // pause emitting of lines...
    lr.pause();

    // ...do your asynchronous line processing..
    setTimeout(function () {

        // ...and continue emitting lines. (1 sec delay)
        lr.resume();
    }, 1000);
});

1 Comment

Does this prevent further line events from being omitted until the code I want to run for the previous line is finished (promise is resolved)? Also, there is no CSV parser involved. I don't see how this is any different than using one of the CSV libraries I mentioned, getting events for each (parsed) line as json - the problem I'm having is that I don't want any more events until I'm finished handling the previous one.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.