C#,BlockingCollection:如何等待收集少于N个项目
大家。 我在传统的生产者 - 消费者场景中使用BlockingCollection。 要逐个处理集合中的项目,我必须编写以下代码:
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
ProcessItem(item);
}
但是如何处理一批N个物品(等到收集少于N个物品)? 我的解决方案使用一些临时缓冲区:
var buffer = new List<MyType>(N);
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
buffer.Add(item);
if (buffer.Count == N)
{
foreach (var item in items)
{
ProcessItem(item);
}
buffer.Clear();
}
}
但在我看来,非常丑陋......有没有更好的方法?
[更新]:这是扩展方法的原型,它使解决方案更具可读性。 也许,有人会觉得它很有用:
public static class BlockingCollectionExtensions
{
public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection,
CancellationToken cancellationToken, Int32 bufferSize)
{
var buffer = new List<T>(bufferSize);
while (buffer.Count < bufferSize)
{
try
{
buffer.Add(collection.Take(cancellationToken));
}
catch (OperationCanceledException)
{
// we need to handle the rest of buffer,
// even if the task has been cancelled.
break;
}
}
return buffer;
}
}
和用法:
foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token, 5))
{
// TODO: process items here...
}
当然,这不是一个完整的解决方案:例如,我会添加任何超时支持 - 如果没有足够的项目,但时间已过,我们需要停止等待并处理已添加到缓冲区的项目。
我没有发现那种丑陋的解决方案。 批处理是对阻塞收集所做的正交要求,应该这样处理。 我会将批处理行为封装在一个带有干净界面的BatchProcessor
类中,但BatchProcessor
我并没有真正看到这种方法的问题。
您可能会发现队列的无锁实现以及阻塞集合是过早的优化。 如果退后一步并使用基于监控器的锁队列,则可能可以编写更清晰的代码。
首先我不确定你的逻辑是否正确。 你说你想等到收集少于N件物品 - 是不是相反? 您希望集合具有N个或更多项目,以处理N个项目。 或者我是误解。
然后,我还建议您在物品少于N个的情况下逐个处理物品,或者您可能会发现您的应用似乎挂在N-1个物品上。 当然,如果这是一个稳定的数据流,只有在buffer.Count> = N的情况下才可以处理。
我建议像GregC那样去排队等待。
像这样的东西:
public object Dequeue() {
while (_queue.Count < N) {
Monitor.Wait(_queue);
}
return _queue.Dequeue();
}
public void Enqueue( object q )
{
lock (_queue)
{
_queue.Enqueue(q);
if (_queue.Count == N)
{
// wake up any blocked dequeue call(s)
Monitor.PulseAll(_queue);
}
}
}
链接地址: http://www.djcxy.com/p/67307.html
上一篇: C#, BlockingCollection: How to wait until collection has less than N items