8

Per a prior thread here:

Node async loop - how to make this code run in sequential order?

...I'm looking for broader advice on processing large data upload files.

Scenario:

User uploads a very large CSV file with hundreds-of-thousands to millions of rows. It's streaming into an endpoint using multer:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});

Each row is transformed into a JSON object. That object is then mapped into several smaller ones, which need to be inserted into several different tables, spread out across, and accessed by, various microservice containers.

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});

It's obvious I'm going to run into memory limitations with this approach. What is the most efficient manner for doing this?

1
  • You need to use streams throughout the pipeline. I recommend using the event-stream package: npmjs.com/package/event-stream Commented Dec 27, 2017 at 23:55

3 Answers 3

13

To avoid memory issues you need to process the file using streams - in plain words, incrementally. Instead of loading the whole file in memory, you read each row, it get's processed accordingly then immediately after becomes eligible for Garbage Collection.

In Node, you can do this with a combination of a CSV stream parser to stream the binary contents as CSV rows and through2, a stream utility that allows you to control the flow of the stream; in this case pausing it momentarily to allow saving the rows in the DB.

The Process

The process goes as follows:

  • You acquire a stream to the data
  • You pipe it through the CSV parser
  • You pipe it through a through2
  • You save each row in your database
  • When you're done saving, call cb() to move on to the next item.

I'm not familiar with multer but here's an example that uses a stream from a File.

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))

The example foo.csv CSV is this:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper

Why?

This approach avoids having to load the entire CSV in-memory. As soon as a row is processed it goes out of scope/becomes unreacheable, hence it's eligible for Garbage Collection. This is what makes this approach so memory efficient. In theory this allows you to process files of infinite size. Read the Streams Handbook for more info on streams.

Some Tips

  • You probably want to save/process more than 1 row per cycle (in equal sized chunks). In that case push some rows into an Array, process/save the entire Array (the chunk) and then call cb to move on to the next chunk - repeating the process.
  • Streams emit events that you can listen on. The end/error events are particularly useful for responding back whether the operation was a success or a failure.
  • Express works with streams by default - I'm almost certain you don't need multer at all.
Sign up to request clarification or add additional context in comments.

4 Comments

when reading the other question Tsar seems to want to save to database using a http request. Making these request serial may be too slow but calling callback without waiting for the request to finish may be too quick. Tsar can use the code with throttled promises if it's too fast.
You have the best approach by streaming the csv, maybe take batch size amount and process it throttled to what the server can handle, when batch is processed read in another batch (call the callback).
The above is more or less illustrating the concept. It needs tweaks - 1st thing I would do is accumulate 5000-10000 rows in an Array and save/process that chunk in one-go if possible. Touching the DB for each and every row is, of course, terribly inefficient.
Yes, if touching the db actually means using a database driver. Original question had Tsar inserting records using http request so maybe 3rd party service. If that service has no bulk insert then throttling should probably be applied.
2

Large .csv Data Parse and Import

I used the above model to import a 1.7mm x 200 matrix of csv data into mongo with the following code. Admittedly It's slow and I could use a little help on learning how to chunk the data better to make it more efficient, i.e. instead of inserting after each read, accumulate rows into an array of 5,10,25k rows and then insertMany or better yet become fluent in the through2-map or through2-filter methods. If anyone is willing to share an example, thank you in advance.

require('dotenv').config();
const parse = require('csv-parser');
const fs = require("fs");
const through2 = require('through2')
const db = require('../models');

const file = "myFile.csv"
const rows = [];

//========Constructor Function for Mongo Import after each read======//
function Hotspot(variable1, variable2,...) {
this.variable1 = variable1;
this.variable2 = variable2;
...}

//========Counter so I can monitor progress in console============//
let counter = 0;
const rows = [];

//This function is imported & run in server.js from './scripts' after mongoose connection established//

exports.importCsvData = () => {
    fs.createReadStream(myFile)
        .pipe(parse())  
        .pipe(through2({ objectMode: true }, (row, enc, cb) => {
            let hotspot = new Hotspot(
                `${row["ROW_VARIABLE_COLUMN_1"]}`,
                `${row["ROW_VARIABLE_COLUMN_2"]}`,...)

     db.MongoModel.create(hotspot)
                .then(result => console.log('created', counter++))
                .then(() => { cb(null, true) })
                .catch(err => {
                    cb(err, null)
                })
        }))
        .on('data', (row) => {
            rows.push(row);
        })
        .on('end', () => {
            console.log('read complete')
        })
}

I used the following posts and links:

as basis and reference for writing this script. Appears to be working 'fine' except that I started this at 10pm last night and it is less than halfway finished as of 7:45am this morning. This is better than the "event": "Allocation failed - JavaScript heap out of memory" error I received after I tried to accumulate all of my 'hotspot' objects into an array of hotspots for the purpose of bulk insert into mongoDB. I am fairly new to readStream/through2/csv-parser in Node and learning, but wanted to share something that works, and is currently working.

Comments

0

You can also use this example

const fs = require('fs');
const csv = require('csv-stream');
const through2 = require('through2');

// Mock async function to simulate saving to a database
const saveIntoDatabase = async (row) => {
  return new Promise((resolve) => setTimeout(resolve, 500));
};

// Function to process the CSV using async/await
const processCSV = async (filePath) => {
  return new Promise((resolve, reject) => {
    const stream = fs.createReadStream(filePath)
      .pipe(csv.createStream({
        endLine: '\n',
        columns: ['Year', 'Make', 'Model'],
        escapeChar: '"',
        enclosedChar: '"'
      }))
      .pipe(through2.obj(async function (row, enc, cb) {
        try {
          await saveIntoDatabase(row);
          console.log('Saved row:', row);
          cb(null, row);
        } catch (err) {
          cb(err);
        }
      }))
      .on('data', () => {}) // Required to keep the stream flowing
      .on('end', () => {
        console.log('CSV processing complete.');
        resolve();
      })
      .on('error', (err) => {
        console.error('Stream error:', err);
        reject(err);
      });
  });
};

// Run the CSV processor
(async () => {
  try {
    await processCSV('foo.csv');
  } catch (err) {
    console.error('Failed to process CSV:', err);
  }
})();

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.