CPU绑定任务的并行化继续IO绑定
我试图找出一种很好的方法来完成处理大数据集的代码的并行化,然后将结果数据导入到RavenDb中。
数据处理是CPU绑定和数据库导入IO绑定。
我正在寻找一种解决方案来并行处理Environment.ProcessorCount线程数量。 然后将结果数据导入RavenDb的x(可以说10个)并行线程与上述过程并行。
这里主要的是我希望处理在导入完成数据时继续,以便在等待导入完成时继续处理下一个数据集。
另一个问题是每个批次的内存需要在成功导入后丢弃,因为私人工作内存可以轻松达到> 5GB。
下面的代码是我到目前为止。 请注意,它没有满足上面列出的并行化要求。
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
Task.Run(() =>
{
...
}
}
GetDataItem产生分割成批量数据集的可枚举数据项。 GetDataItem将产生约2,000,000个项目,每个项目的平均处理时间约为0.3ms。
该项目运行在x64平台上的最新.NET 4.5 RC上。
更新。
我目前的代码(见上)将获取项目并分批分区。 每个批处理在8个线程上并行处理(i7上的Environment.ProcessorCount)。 处理速度慢,受CPU限制并且内存密集。 当单个批次的处理完成时,任务将开始将结果数据异步导入RavenDb。 批量导入作业本身是同步的,如下所示:
using (var session = Store.OpenSession())
{
foreach (var data in batch)
{
session.Store(data);
}
session.SaveChanges();
}
这种方法存在一些问题:
每当批次完成时,任务就开始运行导入作业。 我想限制并行运行的任务数量(例如,最大值10)。 此外,即使许多任务开始,他们似乎永远不会并行运行。
内存分配是一个巨大的问题。 处理/导入批处理后,它似乎仍保留在内存中。
我正在寻找方法来处理上述问题。 理想情况下,我想:
解
var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
bc.GetConsumingEnumerable()
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(batch =>
{
using (var session = Store.OpenSession())
{
foreach (var i in batch) session.Store(i);
session.SaveChanges();
}
});
});
var processTask = Task.Run(() =>
{
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
bc.Add(batch.Select(i => new Data()
{
...
}).ToList());
});
});
processTask.Wait();
bc.CompleteAdding();
importTask.Wait();
整体而言,您的任务听起来像生产者 - 消费者工作流 你的批处理器是生产者,你的RavenDB数据“进口”是生产者输出的消费者。
考虑使用BlockingCollection<T>
作为批处理处理器和数据库导入器之间的连接。 一旦批量处理器将完成的批次推入阻塞集合,数据库导入器就会唤醒,并且当它们“赶上”并清空集合时,它们将回到睡眠状态。
批处理器生产者可以运行完全限制,并且将始终与处理先前完成的批处理的数据库导入器任务同时运行。 如果您担心批处理器可能比数据库导入器过于远远(b / c数据库导入需要比处理每批更长的时间),则可以设置阻塞集合的上限,以便生产者在添加时阻止超出这个限度,给消费者一个追赶的机会。
不过,您的一些评论令人担忧。 在启动一个Task实例以异步执行数据库导入批处理方面没有任何特别的错误。 任务!=线程。 创建新任务实例没有创建新线程的巨大开销。
不要因为试图精确地控制线程而挂断电话。 即使您指定您需要的内存数量与内核数量完全相同,您也不会独占使用这些内核。 来自其他进程的数百个其他线程仍将安排在您的时间片之间。 使用任务指定工作的逻辑单元,并让TPL管理线程池。 节省自己对控制感的错觉。 ;>
在你的评论中,你指出你的任务似乎没有对彼此运行异步(你如何确定这一点?),并且在每批完成后内存似乎不会被释放。 我建议放弃一切,直到你能够首先解决这两个问题。 你忘了在什么地方调用Dispose()? 你是否保留了一个引用,使整个对象树不必要地活着? 你在测量正确的东西吗? 并行任务是由阻塞数据库还是网络I / O序列化的? 在解决这两个问题之前,并行计划的内容并不重要。
对于每一批你开始一项任务。 这意味着你的循环很快完成。 它离开(批次数量)后面的任务不是你想要的。 你想要(CPU数量)。
解决方案:不要为每个批次启动一项新任务。 for循环已经平行。
回应你的评论,这是一个改进版本:
//this runs in parallel
var processedBatches = datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.Select(x => ProcessCpuBound(x));
foreach (var batch in processedBatches) {
PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
}
我最近建立了类似的东西,我用Queue class vs List和Parallel.Foreach。 我发现太多线程实际上减慢了速度,这是一个甜蜜点。
链接地址: http://www.djcxy.com/p/50163.html上一篇: Parallelization of CPU bound task continuing with IO bound
下一篇: Sequentially Splitting the load on a Parallel.Foreach Loop