用mongoose保存非常大的CSV到mongoDB

我有一个包含200,000多行的CSV文件。 我需要将它保存到MongoDB。

如果我尝试for循环,Node将耗尽内存。

fs.readFile('data.txt', function(err, data) {
  if (err) throw err;

  data.split('n');

  for (var i = 0; i < data.length, i += 1) {
    var row = data[i].split(',');

    var obj = { /* The object to save */ }

    var entry = new Entry(obj);
    entry.save(function(err) {
      if (err) throw err;
    }
  } 
}

我怎样才能避免用尽仪式?


欢迎来到流媒体。 你真正想要的是一种“流式处理”,它可以一次处理你的输入“一次一个”,当然最好用一个通用的分隔符,比如你当前使用的“新行”字符。

对于真正有效的东西,您可以添加MongoDB“批量API”插入的使用,使您的加载尽可能快,而不会耗尽所有机器内存或CPU周期。

不主张,因为有各种解决方案可用,但这里是一个利用线路输入流包使“线路终结器”部分简单的列表。

仅限“示例”的架构定义:

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        bulk.execute(function(err,result) {
                            if (err) throw err;   // or do something
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

因此,通常在那里的“流”接口“打断输入”以处理“一次一行”。 这会阻止您一次加载所有内容。

主要部分是MongoDB的“批量操作API”。 这允许您在实际发送到服务器之前一次“排队”许多操作。 因此,在这种情况下,使用“模数”,写入操作只会在每1000个条目处理完成。 你可以真正做到16MB BSON限制,但保持可管理性。

除了批量处理的操作外,异步库还有一个额外的“限制器”。 这并不是必需的,但是这确保了在任何时候基本上不超过文档的“模数限制”。 除了内存以外,通用批次“插入”不需要IO成本,但“执行”调用意味着IO正在处理。 所以我们等待而不是排队更多的事情。

您肯定可以找到更好的解决方案,可以找到“流处理”CSV类型的数据,这似乎是。 但是总的来说,这给你提供了如何以高效率的方式做到这一点的概念,而不用吃CPU循环。


接受的答案很好,并试图涵盖这个问题的所有重要方面。

  • 将CSV文件作为一系列线读取
  • 将文档批量写入MongoDB
  • 阅读和写作之间的同步
  • 虽然它在前两个方面表现良好,但使用async.series()解决同步的方法将无法按预期工作。

    stream.on("line",function(line) {
        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation
    
                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.
    
                    counter++;
    
                    if ( counter % 1000 == 0 ) {
                        bulk.execute(function(err,result) {
                            if (err) throw err;   // or do something
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );
    });
    

    这里bulk.execute()是一个mongodb写操作,它是一个异步IO调用。 这允许node.js在bulk.execute()完成其db写入和回调之前继续执行事件循环。

    所以它可能会继续从流中接收更多'行'事件,并将更多文档bulk.insert(obj)排队,并可以再次触发下一个模块以再次触发bulk.execute()。

    让我们看看这个例子。

    var async = require('async');
    
    var bulk = {
        execute: function(callback) {
            setTimeout(callback, 1000);
        }
    };
    
    async.series(
        [
           function (callback) {
               bulk.execute(function() {
                  console.log('completed bulk.execute');
                  callback(); 
               });
           },
        ], 
        function(err) {
    
        }
    );
    
    console.log("!!! proceeding to read more from stream");
    

    它是输出

    !!! proceeding to read more from stream
    completed bulk.execute
    

    要真正确保我们在任何给定时间处理一批N个文档,我们需要使用stream.pause()stream.resume()来强制执行文件流的流控制。

    var LineInputStream = require("line-input-stream"),
        fs = require("fs"),
        mongoose = require("mongoose"),
        Schema = mongoose.Schema;
    
    var entrySchema = new Schema({},{ strict: false });
    var Entry = mongoose.model( "Entry", entrySchema );
    
    var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
    
    stream.setDelimiter("n");
    
    mongoose.connection.on("open",function(err,conn) { 
    
        // lower level method, needs connection
        var bulk = Entry.collection.initializeOrderedBulkOp();
        var counter = 0;
    
        stream.on("error",function(err) {
            console.log(err); // or otherwise deal with it
        });
    
        stream.on("line",function(line) {
            var row = line.split(",");     // split the lines on delimiter
            var obj = {};             
            // other manipulation
    
            bulk.insert(obj);  // Bulk is okay if you don't need schema
                               // defaults. Or can just set them.
    
            counter++;
    
            if ( counter % 1000 === 0 ) {
                stream.pause(); //lets stop reading from file until we finish writing this batch to db
    
                bulk.execute(function(err,result) {
                    if (err) throw err;   // or do something
                    // possibly do something with result
                    bulk = Entry.collection.initializeOrderedBulkOp();
    
                    stream.resume(); //continue to read from file
                });
            }
        });
    
        stream.on("end",function() {
            if ( counter % 1000 != 0 ) {
                bulk.execute(function(err,result) {
                    if (err) throw err;   // or something
                    // maybe look at result
                });
            }
        });
    
    });
    
    链接地址: http://www.djcxy.com/p/52287.html

    上一篇: Save a very big CSV to mongoDB using mongoose

    下一篇: Why don't scripting languages output Unicode to the Windows console?