数据流将工作分解为小作业,然后再次分组
我需要做这样的工作:
由于我需要控制并行处理的页数,因此我决定使用TPL数据流:
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
____________________________
| Process images |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|____________________________|
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
现在我需要“过程映像”来并行处理图像,但我想限制当前在所有并行页面上处理的图像的数量。
我可以使用TrasnformManyBlock作为“过程映像”,但是如何将它们收集回“保存页面”块?
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
___________________________________
| Load images |
| TransformManyBlock<Page, Image[]> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|___________________________________|
/ |
______________________________________________
_|____________________________________________ |
| Process image | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1 | |
| MaxDegreeOfParallelism = 8 |_|
|______________________________________________|
| /
How to group images by page ?
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
最重要的是,其中一个图像可能无法继续进行,我不想用失败的图像保存页面。
只要给定页面的图像到达,您就可以通过录制将图像分组在一起,然后在所有图像到达时发送页面。 为了解决这个问题,页面需要知道它包含了多少图片,但我认为你知道这一点。
在代码中,它可能看起来像这样:
public static IPropagatorBlock<TSplit, TMerged>
CreaterMergerBlock<TSplit, TMerged>(
Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
var dictionary = new Dictionary<TMerged, int>();
return new TransformManyBlock<TSplit, TMerged>(
split =>
{
var merged = getMergedFunc(split);
int count;
dictionary.TryGetValue(merged, out count);
count++;
if (getSplitCount(merged) == count)
{
dictionary.Remove(merged);
return new[] { merged };
}
dictionary[merged] = count;
return new TMerged[0];
});
}
用法:
var dataPipe = new BufferBlock<Page>();
var splitter = new TransformManyBlock<Page, ImageWithPage>(
page => page.LoadImages(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
image =>
{
// process the image here
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var merger = CreaterMergerBlock(
(ImageWithPage image) => image.Page, page => page.ImageCount);
var savePage = new ActionBlock<Page>(
page => /* save the page here */,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
考虑将“加载图像”和“处理图像”合并到一个TransformBlock
块中。 这样你就可以将单个页面的图像放在一起。
为了实现并发限制目标,请使用SemaphoreSlim
:
SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);
//...
var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
var images = GetImages(page);
ImageWithPage[] processedImages =
images
.AsParallel()
.Select(i => {
processImageDopLimiter.WaitOne();
var result = ProcessImage(i);
processImageDopLimiter.ReleaseOne();
return result;
})
.ToList();
return new { page, processedImages };
这将导致相当多的线程被阻塞等待。 如果您愿意,您可以使用此处理的异步版本。 这对这个问题并不重要。
链接地址: http://www.djcxy.com/p/23305.html上一篇: Dataflow with splitting work to small jobs and then group again