CPU绑定任务的并行化继续IO绑定

我试图找出一种很好的方法来完成处理大数据集的代码的并行化,然后将结果数据导入到RavenDb中。

数据处理是CPU绑定和数据库导入IO绑定。

我正在寻找一种解决方案来并行处理Environment.ProcessorCount线程数量。 然后将结果数据导入RavenDb的x(可以说10个)并行线程与上述过程并行。

这里主要的是我希望处理在导入完成数据时继续,以便在等待导入完成时继续处理下一个数据集。

另一个问题是每个批次的内存需要在成功导入后丢弃,因为私人工作内存可以轻松达到> 5GB。

下面的代码是我到目前为止。 请注意,它没有满足上面列出的并行化要求。

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem产生分割成批量数据集的可枚举数据项。 GetDataItem将产生约2,000,000个项目,每个项目的平均处理时间约为0.3ms。

该项目运行在x64平台上的最新.NET 4.5 RC上。

更新。

我目前的代码(见上)将获取项目并分批分区。 每个批处理在8个线程上并行处理(i7上的Environment.ProcessorCount)。 处理速度慢,受CPU限制并且内存密集。 当单个批次的处理完成时,任务将开始将结果数据异步导入RavenDb。 批量导入作业本身是同步的,如下所示:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

这种方法存在一些问题:

  • 每当批次完成时,任务就开始运行导入作业。 我想限制并行运行的任务数量(例如,最大值10)。 此外,即使许多任务开始,他们似乎永远不会并行运行。

  • 内存分配是一个巨大的问题。 处理/导入批处理后,它似乎仍保留在内存中。

  • 我正在寻找方法来处理上述问题。 理想情况下,我想:

  • 每个逻辑处理器有一个线程负责繁重处理批量数据。
  • 十个左右的并行线程等待已完成的批次导入到RavenDb中。
  • 将内存分配保持在最低限度,这意味着在导入任务完成后取消分配批处理。
  • 不要在其中一个线程上运行导入作业进行批处理。 已完成批次的导入应与正在处理的下一批量并行运行。
  • var batchSize = 10000;
    var bc = new BlockingCollection<List<Data>>();
    var importTask = Task.Run(() =>
    {
        bc.GetConsumingEnumerable()
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(batch =>
            {
                using (var session = Store.OpenSession())
                {
                    foreach (var i in batch) session.Store(i);
                    session.SaveChanges();
                }
            });
    });
    var processTask = Task.Run(() =>
    {
        datasupplier.GetDataItems()
            .Partition(batchSize)
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            .ForAll(batch =>
            {
                bc.Add(batch.Select(i => new Data()
                {
                    ...
                }).ToList());
            });
    });
    
    processTask.Wait();
    bc.CompleteAdding();
    importTask.Wait();
    

    整体而言,您的任务听起来像生产者 - 消费者工作流 你的批处理器是生产者,你的RavenDB数据“进口”是生产者输出的消费者。

    考虑使用BlockingCollection<T>作为批处理处理器和数据库导入器之间的连接。 一旦批量处理器将完成的批次推入阻塞集合,数据库导入器就会唤醒,并且当它们“赶上”并清空集合时,它们将回到睡眠状态。

    批处理器生产者可以运行完全限制,并且将始终与处理先前完成的批处理的数据库导入器任务同时运行。 如果您担心批处理器可能比数据库导入器过于远远(b / c数据库导入需要比处理每批更长的时间),则可以设置阻塞集合的上限,以便生产者在添加时阻止超出这个限度,给消费者一个追赶的机会。

    不过,您的一些评论令人担忧。 在启动一个Task实例以异步执行数据库导入批处理方面没有任何特别的错误。 任务!=线程。 创建新任务实例没有创建新线程的巨大开销。

    不要因为试图精确地控制线程而挂断电话。 即使您指定您需要的内存数量与内核数量完全相同,您也不会独占使用这些内核。 来自其他进程的数百个其他线程仍将安排在您的时间片之间。 使用任务指定工作的逻辑单元,并让TPL管理线程池。 节省自己对控制感的错觉。 ;>

    在你的评论中,你指出你的任务似乎没有对彼此运行异步(你如何确定这一点?),并且在每批完成后内存似乎不会被释放。 我建议放弃一切,直到你能够首先解决这两个问题。 你忘了在什么地方调用Dispose()? 你是否保留了一个引用,使整个对象树不必要地活着? 你在测量正确的东西吗? 并行任务是由阻塞数据库还是网络I / O序列化的? 在解决这两个问题之前,并行计划的内容并不重要。


    对于每一批你开始一项任务。 这意味着你的循环很快完成。 它离开(批次数量)后面的任务不是你想要的。 你想要(CPU数量)。

    解决方案:不要为每个批次启动一项新任务。 for循环已经平行。

    回应你的评论,这是一个改进版本:

    //this runs in parallel
    var processedBatches = datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .Select(x => ProcessCpuBound(x));
    
    foreach (var batch in processedBatches) {
     PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
    }
    

    我最近建立了类似的东西,我用Queue class vs List和Parallel.Foreach。 我发现太多线程实际上减慢了速度,这是一个甜蜜点。

    链接地址: http://www.djcxy.com/p/50163.html

    上一篇: Parallelization of CPU bound task continuing with IO bound

    下一篇: Sequentially Splitting the load on a Parallel.Foreach Loop