Parallel.ForEach with a custom TaskScheduler to prevent OutOfMemoryException

I am processing PDFs of vastly varying sizes (simple 2MB to high DPI scans of a few hundred MB) via a Parallel.ForEach and am occasionally getting to an OutOfMemoryException - understandably due to the process being 32 bit and the threads spawned by the Parallel.ForEach taking up an unknown amount of memory consuming work.

Restricting MaxDegreeOfParallelism does work, though the throughput for the times when there is a large (10k+) batch of small PDFs to work with is not sufficient as there could be more threads working due to the small memory footprint of said threads. This is a CPU heavy process with Parallel.ForEach easily reaching 100% CPU before hitting the occasional group of large PDFs and getting an OutOfMemoryException. Running the Performance Profiler backs this up.

From my understanding, having a partitioner for my Parallel.ForEach won't improve my performance.

This leads me to using a custom TaskScheduler passed to my Parallel.ForEach with a MemoryFailPoint check. Searching around it seems there is scarce information on creating custom TaskScheduler objects.

Looking between Specialized Task Schedulers in .NET 4 Parallel Extensions Extras, A custom TaskScheduler in C# and various answers here on Stackoverflow, I've created my own TaskScheduler and have my QueueTask method as such:

protected override void QueueTask(Task task)
{
    lock (tasks) tasks.AddLast(task);
    try
    {
        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600))
        {
            if (runningOrQueuedCount < maxDegreeOfParallelism)
            {
                runningOrQueuedCount++;
                RunTasks();
            }
        }
    }
    catch (InsufficientMemoryException e)
    {     
        // somehow return thread to pool?           
        Console.WriteLine("InsufficientMemoryException");
    }
}

While the try/catch is a little expensive my goal here is to catch when the probable maximum size PDF (+ a little extra memory overhead) of 600MB will throw an OutOfMemoryException. This solution through seems to kill off the thread attempting to do the work when I catch the InsufficientMemoryException. With enough large PDFs my code ends up being a single thread Parallel.ForEach.

Other questions found on Stackoverflow on Parallel.ForEach and OutOfMemoryExceptions don't appear to suit my use case of maximum throughput with dynamic memory usage on threads and often just leverage MaxDegreeOfParallelism as a static solution, Eg:

  • Parallel.For System.OutOfMemoryException
  • Parallel.ForEach can cause a “Out Of Memory” exception if working with a enumerable with a large object
  • So to have maximum throughput for variable working memory sizes, either:

  • How do I return a thread back into the threadpool when it has been denied work via the MemoryFailPoint check?
  • How/where do I safely spawn new threads to pick up work again when there is free memory?
  • Edit: The PDF size on disk may not linearly represent size in memory due to the rasterization and rasterized image manipulation component which is dependent on the PDF content.


    Using LimitedConcurrencyLevelTaskScheduler from Samples for Parallel Programming with the .NET Framework I was able to make a minor adjustment to get something that looked about what I wanted. The following is the NotifyThreadPoolOfPendingWork method of the LimitedConcurrencyLevelTaskScheduler class after modification:

    private void NotifyThreadPoolOfPendingWork()
    {
        ThreadPool.UnsafeQueueUserWorkItem(_ =>
        {
            // Note that the current thread is now processing work items.
            // This is necessary to enable inlining of tasks into this thread.
            _currentThreadIsProcessingItems = true;
            try
            {
                // Process all available items in the queue.
                while (true)
                {
                    Task item;
                    lock (_tasks)
                    {
                        // When there are no more items to be processed,
                        // note that we're done processing, and get out.
                        if (_tasks.Count == 0)
                        {
                            --_delegatesQueuedOrRunning;
                            break;
                        }
    
                        // Get the next item from the queue
                        item = _tasks.First.Value;
                        _tasks.RemoveFirst();
                    }
    
                    // Execute the task we pulled out of the queue
                    //base.TryExecuteTask(item);
    
                    try
                    {
                        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650))
                        {
                            base.TryExecuteTask(item);
                        }
                    }
                    catch (InsufficientMemoryException e)
                    {
                        Thread.Sleep(500);
    
                        lock (_tasks)
                        {
                            _tasks.AddLast(item);
                        }
                    }
    
                }
            }
            // We're done processing items on the current thread
            finally { _currentThreadIsProcessingItems = false; }
        }, null);
    }
    

    We'll look at the catch, but in reverse. We add the task we were going to work on back to the list of tasks ( _tasks ) which triggers an event to get an available thread to pick up that work. But we sleep the current thread first in order for it to not pick up the work straight way and go back into a failed MemoryFailPoint check.

    链接地址: http://www.djcxy.com/p/50178.html

    上一篇: Java执行者:如何在不阻塞的情况下通知,何时完成任务?

    下一篇: 带有自定义TaskScheduler的Parallel.ForEach以防止OutOfMemoryException