Dataflow with splitting work to small jobs and then group again

I need to do this kind of work:

  • Get Page object from database
  • For each page get all images and process them (IO bound, for example, upload to CDN)
  • If all images proceeded successfully then mark Page as processed in database
  • Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:

     ____________________________
    |         Data pipe          |
    |   BufferBlock<Page>        |
    |   BoundedCapacity = 1      |
    |____________________________|
                  |
     ____________________________
    |       Process images       |
    | TransformBlock<Page, Page> |
    | BoundedCapacity = 1        |
    | MaxDegreeOfParallelism = 8 |
    |____________________________|
                  |
     ____________________________
    |        Save page           |
    | ActionBlock<Page>          |
    | BoundedCapacity = 1        |
    | MaxDegreeOfParallelism = 5 |
    |____________________________|
    

    Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.

    I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?

             ____________________________
            |         Data pipe          |
            |   BufferBlock<Page>        |
            |   BoundedCapacity = 1      |
            |____________________________|
                          |
         ___________________________________
        |           Load images             |
        | TransformManyBlock<Page, Image[]> |
        | BoundedCapacity = 1               |
        | MaxDegreeOfParallelism = 8        |
        |___________________________________|
          /              |              
       ______________________________________________
     _|____________________________________________  |
    |              Process image                   | |
    | TransformBlock<ImageWithPage, ImageWithPage> | |
    | BoundedCapacity = 1                          | |
    | MaxDegreeOfParallelism = 8                   |_|
    |______________________________________________|
                        |               /
             How to group images by page ?
                         |
            ____________________________
           |        Save page           |
           | ActionBlock<Page>          |
           | BoundedCapacity = 1        |
           | MaxDegreeOfParallelism = 5 |
           |____________________________|
    

    On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.


    You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.

    In code, it could look something like this:

    public static IPropagatorBlock<TSplit, TMerged>
        CreaterMergerBlock<TSplit, TMerged>(
        Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
    {
        var dictionary = new Dictionary<TMerged, int>();
    
        return new TransformManyBlock<TSplit, TMerged>(
            split =>
            {
                var merged = getMergedFunc(split);
                int count;
                dictionary.TryGetValue(merged, out count);
                count++;
                if (getSplitCount(merged) == count)
                {
                    dictionary.Remove(merged);
                    return new[] { merged };
                }
    
                dictionary[merged] = count;
                return new TMerged[0];
            });
    }
    

    Usage:

    var dataPipe = new BufferBlock<Page>();
    
    var splitter = new TransformManyBlock<Page, ImageWithPage>(
        page => page.LoadImages(),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    
    var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
        image =>
        {
            // process the image here
            return image;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    
    var merger = CreaterMergerBlock(
        (ImageWithPage image) => image.Page, page => page.ImageCount);
    
    var savePage = new ActionBlock<Page>(
        page => /* save the page here */,
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
    
    dataPipe.LinkTo(splitter);
    splitter.LinkTo(processImage);
    processImage.LinkTo(merger);
    merger.LinkTo(savePage);
    

    Consider merging "Load images" and "Process images" into one TransformBlock block. That way you have no trouble keeping the images of a single page together.

    In order to achieve your concurrency limit goal, use a SemaphoreSlim :

    SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);
    
    //...
    
    var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
    var images = GetImages(page);
    ImageWithPage[] processedImages =
     images
     .AsParallel()
     .Select(i => {
        processImageDopLimiter.WaitOne();
        var result = ProcessImage(i);
        processImageDopLimiter.ReleaseOne();
        return result;
     })
     .ToList();
    return new { page, processedImages };
    

    This will lead to quite a few threads blocked waiting. You can use an asynchronous version of this processing if you like. This is immaterial to the question.

    链接地址: http://www.djcxy.com/p/23306.html

    上一篇: AppCompat v7:21拆分操作栏是否损坏?

    下一篇: 数据流将工作分解为小作业,然后再次分组