Parallelization of CPU bound task continuing with IO bound

I'm trying to figure out a good way to do parallelization of code that does processing of big datasets and then imports the resulting data into RavenDb.

The data processing is CPU bound and database import IO bound.

I'm looking for a solution to do the processing in parallel on Environment.ProcessorCount number of threads. The resulting data should then be imported into RavenDb on x (lets say 10) pooled threads in parallel with the above process.

The main thing here is I want the processing to continue while completed data is being imported so that processing the next dataset continues while waiting for the import to complete.

Another issue is the memory for each batch needs to be discarded after a successful import as the private working memory can easily reach >5GB.

The code below is what I've got so far. Do note that it does not fullfill the parallelization requirements outlined above.

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem yields enumerable data items that are partitioned into a batch dataset. GetDataItem will yield ~2,000,000 items each averaging around 0.3ms for processing.

The project is running on the latest .NET 4.5 RC on a x64 platform.

Update.

My current code (seen above) will fetch items and partition them in batches. Each batch is processed in parallel on eight threads (Environment.ProcessorCount on i7). The processing is slow, cpu-bound and memory intensive. When processing of a single batch is complete, a task is started to asynchronously import the resulting data into RavenDb. The batch import job is itself synchronous and looks like:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

There are a few problems with this approach:

  • Every time a batch is completed a task is started to run the import job. I want to limit the number of tasks that run in parallel (eg. max 10). Additionally even though many tasks are started they seem to never run in parallel.

  • Memory allocations are a huge problem. Once a batch is processed/imported it seems to still remain in memory.

  • I'm looking for ways to take care of the issues outlined above. Ideally I want:

  • One thread per logical processor doing heavy lifting processing batches of data.
  • Ten or so parallel threads waiting for completed batches to import into RavenDb.
  • To keep memory allocations to a minimum which means unallocating a batch after the import task is complete.
  • To not run import jobs on one of the threads for batch processing. Import of completed batches should run in parallel to the next batch being processed.
  • Solution

    var batchSize = 10000;
    var bc = new BlockingCollection<List<Data>>();
    var importTask = Task.Run(() =>
    {
        bc.GetConsumingEnumerable()
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(batch =>
            {
                using (var session = Store.OpenSession())
                {
                    foreach (var i in batch) session.Store(i);
                    session.SaveChanges();
                }
            });
    });
    var processTask = Task.Run(() =>
    {
        datasupplier.GetDataItems()
            .Partition(batchSize)
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            .ForAll(batch =>
            {
                bc.Add(batch.Select(i => new Data()
                {
                    ...
                }).ToList());
            });
    });
    
    processTask.Wait();
    bc.CompleteAdding();
    importTask.Wait();
    

    Your task overall sounds like a producer-consumer workflow. Your batch processors are producers, and your RavenDB data "import" are the consumers of the output of the producers.

    Consider using a BlockingCollection<T> as the connection between your batch proccesors and your db importers. The db importers will wake up as soon as the batch processors push completed batches into the blocking collection, and will go back to sleep when they have "caught up" and emptied the collection.

    The batch processor producers can run full throttle and will always be running concurrent with the db importer tasks processing previously completed batches. If you are concerned that the batch processors may get too far ahead of the db importers (b/c db import takes significantly longer than processing each batch) you can set an upper bound on the blocking collection so that the producers will block when they add beyond that limit, giving the consumers a chance to catch up.

    Some of your comments are worrisome, though. There's nothing particularly wrong with spinning up a Task instance to perform the db import asynchronously to the batch processing. Task != Thread. Creating new task instances does not have the same monumental overhead of creating new threads.

    Don't get hung up on trying to control threads too precisely. Even if you specify that you want exactly as many buckets as you have cores, you don't get exclusive use of those cores. Hundreds of other threads from other processes will still be scheduled in between your time slices. Specify the logical units of work using Tasks and let the TPL manage the thread pool. Save yourself the frustration of a false sense of control. ;>

    In your comments, you indicate that your tasks do not appear to be running async to each other (how are you determining this?) and memory does not appear to be released after each batch is finished. I'd suggest dropping everything until you can figure out what is up with those two problems first. Are you forgetting to call Dispose() somewhere? Are you holding onto a reference that is keeping a whole tree of objects alive unnecessarily? Are you measuring the right thing? Are the parallel tasks being serialized by a blocking database or network I/O? Until these two issues are resolved it doesn't matter what your parallelism plan is.


    For each batch you are starting a task. This means that your loop completes very quickly. It leaves (number of batches) tasks behind which is not what you wanted. You wanted (number of CPUs).

    Solution: Don't start a new task for each batch. The for loop is already parallel.

    In response to your comment, here is an improved version:

    //this runs in parallel
    var processedBatches = datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .Select(x => ProcessCpuBound(x));
    
    foreach (var batch in processedBatches) {
     PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
    }
    

    I recently built something similar, I used the Queue class vs List with the Parallel.Foreach. I found that too many threads actually slowed things down, there is a sweet spot.

    链接地址: http://www.djcxy.com/p/50164.html

    上一篇: Gradle项目的高层并行性

    下一篇: CPU绑定任务的并行化继续IO绑定