带有自定义TaskScheduler的Parallel.ForEach以防止OutOfMemoryException
我正在通过一个Parallel.ForEach处理大小各异的PDF(简单的2MB到几百MB的高DPI扫描),并且偶尔会发生OutOfMemoryException - 可以理解的是,由于进程是32位,并且线程产生了并行。对于每个占用未知量的内存消耗工作。
尽管限制MaxDegreeOfParallelism
的确行得通,但当有大量(10k +)批次的小PDF使用时,吞吐量是不够的,因为可能会有更多的线程工作,因为所述线程的内存占用量小。 这是一个CPU繁重的过程,使用Parallel.ForEach轻松达到100%的CPU,然后碰到偶尔出现的一组大型PDF并且发生OutOfMemoryException。 运行性能分析器支持这一点。
根据我的理解,为我的Parallel.ForEach设置分区并不会提高我的性能。
这导致我使用通过MemoryFailPoint
检查传递给我的Parallel.ForEach的自定义TaskScheduler
。 在它周围搜索似乎有关于创建自定义TaskScheduler
对象的稀缺信息。
在.NET 4 Parallel Extensions Extras,C#中的自定义TaskScheduler以及Stackoverflow上的各种答案之间寻找专门的任务计划程序,我创建了自己的TaskScheduler
并拥有我的QueueTask
方法:
protected override void QueueTask(Task task)
{
lock (tasks) tasks.AddLast(task);
try
{
using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600))
{
if (runningOrQueuedCount < maxDegreeOfParallelism)
{
runningOrQueuedCount++;
RunTasks();
}
}
}
catch (InsufficientMemoryException e)
{
// somehow return thread to pool?
Console.WriteLine("InsufficientMemoryException");
}
}
虽然try / catch有点贵,但我的目标是捕捉600MB的可能的最大大小PDF(+额外的内存开销)将引发OutOfMemoryException。 当我捕获InsufficientMemoryException时,这个解决方案似乎终止了尝试执行该工作的线程。 有了足够大的PDF文件,我的代码最终会成为一个单独的线程Parallel.ForEach。
在Parallel.ForEach和OutOfMemoryExceptions上的Stackoverflow上发现的其他问题似乎不适合我的线程动态内存使用情况下的最大吞吐量用例,并且通常只是将MaxDegreeOfParallelism
用作静态解决方案,例如:
因此,要获得可变工作内存大小的最大吞吐量,可以:
MemoryFailPoint
检查工作时,如何返回线程MemoryFailPoint
? 编辑:由于光栅化和光栅化图像处理组件依赖于PDF内容,因此磁盘上的PDF大小可能无法线性表示内存大小。
使用示例中的LimitedConcurrencyLevelTaskScheduler
与.NET Framework进行并行编程我可以做一些小调整,以获得看起来我想要的东西。 以下是NotifyThreadPoolOfPendingWork
所述的方法LimitedConcurrencyLevelTaskScheduler
改性之后类:
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
//base.TryExecuteTask(item);
try
{
using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650))
{
base.TryExecuteTask(item);
}
}
catch (InsufficientMemoryException e)
{
Thread.Sleep(500);
lock (_tasks)
{
_tasks.AddLast(item);
}
}
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
我们会看看这个问题,但相反。 我们将我们将要工作的任务添加回到触发事件的任务列表( _tasks
)中,以获得可用的线程来完成该工作。 但是我们首先睡眠当前线程,以便它不直接完成工作并返回失败的MemoryFailPoint
检查。
上一篇: Parallel.ForEach with a custom TaskScheduler to prevent OutOfMemoryException
下一篇: How to do Parallel operations with Thread Scope in Web app using Ninject