数据流将工作分解为小作业,然后再次分组

我需要做这样的工作:

  • 从数据库中获取页面对象
  • 对于每个页面,获取所有图像并处理它们(IO限制,例如上传到CDN)
  • 如果所有图像都成功处理,则将页面标记为在数据库中处理
  • 由于我需要控制并行处理的页数,因此我决定使用TPL数据流:

     ____________________________
    |         Data pipe          |
    |   BufferBlock<Page>        |
    |   BoundedCapacity = 1      |
    |____________________________|
                  |
     ____________________________
    |       Process images       |
    | TransformBlock<Page, Page> |
    | BoundedCapacity = 1        |
    | MaxDegreeOfParallelism = 8 |
    |____________________________|
                  |
     ____________________________
    |        Save page           |
    | ActionBlock<Page>          |
    | BoundedCapacity = 1        |
    | MaxDegreeOfParallelism = 5 |
    |____________________________|
    

    现在我需要“过程映像”来并行处理图像,但我想限制当前在所有并行页面上处理的图像的数量。

    我可以使用TrasnformManyBlock作为“过程映像”,但是如何将它们收集回“保存页面”块?

             ____________________________
            |         Data pipe          |
            |   BufferBlock<Page>        |
            |   BoundedCapacity = 1      |
            |____________________________|
                          |
         ___________________________________
        |           Load images             |
        | TransformManyBlock<Page, Image[]> |
        | BoundedCapacity = 1               |
        | MaxDegreeOfParallelism = 8        |
        |___________________________________|
          /              |              
       ______________________________________________
     _|____________________________________________  |
    |              Process image                   | |
    | TransformBlock<ImageWithPage, ImageWithPage> | |
    | BoundedCapacity = 1                          | |
    | MaxDegreeOfParallelism = 8                   |_|
    |______________________________________________|
                        |               /
             How to group images by page ?
                         |
            ____________________________
           |        Save page           |
           | ActionBlock<Page>          |
           | BoundedCapacity = 1        |
           | MaxDegreeOfParallelism = 5 |
           |____________________________|
    

    最重要的是,其中一个图像可能无法继续进行,我不想用失败的图像保存页面。


    只要给定页面的图像到达,您就可以通过录制将图像分组在一起,然后在所有图像到达时发送页面。 为了解决这个问题,页面需要知道它包含了多少图片,但我认为你知道这一点。

    在代码中,它可能看起来像这样:

    public static IPropagatorBlock<TSplit, TMerged>
        CreaterMergerBlock<TSplit, TMerged>(
        Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
    {
        var dictionary = new Dictionary<TMerged, int>();
    
        return new TransformManyBlock<TSplit, TMerged>(
            split =>
            {
                var merged = getMergedFunc(split);
                int count;
                dictionary.TryGetValue(merged, out count);
                count++;
                if (getSplitCount(merged) == count)
                {
                    dictionary.Remove(merged);
                    return new[] { merged };
                }
    
                dictionary[merged] = count;
                return new TMerged[0];
            });
    }
    

    用法:

    var dataPipe = new BufferBlock<Page>();
    
    var splitter = new TransformManyBlock<Page, ImageWithPage>(
        page => page.LoadImages(),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    
    var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
        image =>
        {
            // process the image here
            return image;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    
    var merger = CreaterMergerBlock(
        (ImageWithPage image) => image.Page, page => page.ImageCount);
    
    var savePage = new ActionBlock<Page>(
        page => /* save the page here */,
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
    
    dataPipe.LinkTo(splitter);
    splitter.LinkTo(processImage);
    processImage.LinkTo(merger);
    merger.LinkTo(savePage);
    

    考虑将“加载图像”和“处理图像”合并到一个TransformBlock块中。 这样你就可以将单个页面的图像放在一起。

    为了实现并发限制目标,请使用SemaphoreSlim

    SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);
    
    //...
    
    var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
    var images = GetImages(page);
    ImageWithPage[] processedImages =
     images
     .AsParallel()
     .Select(i => {
        processImageDopLimiter.WaitOne();
        var result = ProcessImage(i);
        processImageDopLimiter.ReleaseOne();
        return result;
     })
     .ToList();
    return new { page, processedImages };
    

    这将导致相当多的线程被阻塞等待。 如果您愿意,您可以使用此处理的异步版本。 这对这个问题并不重要。

    链接地址: http://www.djcxy.com/p/23305.html

    上一篇: Dataflow with splitting work to small jobs and then group again

    下一篇: How do I throttle $http requests in angularjs?