将项目添加到正在迭代的集合中,还是等效?

现在,我有一个C#程序,可以定期执行以下步骤:

  • 从数据库中获取当前任务列表
  • 使用Parallel.ForEach(),可以为每个任务工作
  • 但是,其中一些任务的运行时间非常长。 这延迟了其他未决任务的处理,因为我们只在程序开始时寻找新的任务。

    现在,我知道修改被迭代的集合是不可能的(对吧?),但是C#并行框架中是否有一些等价的功能可以让我在列表中添加工作,同时处理列表中的项目?


    一般来说,你是正确的,修改一个集合,而迭代它是不允许的。 但还有其他方法可以使用:

  • 使用TPL数据流中的ActionBlock<T> 。 代码可能如下所示:

    var actionBlock = new ActionBlock<MyTask>(
        task => DoWorkForTask(task),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    
    while (true)
    {
        var tasks = GrabCurrentListOfTasks();
        foreach (var task in tasks)
        {
            actionBlock.Post(task);
    
            await Task.Delay(someShortDelay);
            // or use Thread.Sleep() if you don't want to use async
        }
    }
    
  • 使用BlockingCollection<T> ,它可以在消耗项目的同时进行修改,以及ParallelExtensionsExtras中的GetConsumingParititioner() ,以使其与Parallel.ForEach()

    var collection = new BlockingCollection<MyTask>();
    
    Task.Run(async () =>
    {
        while (true)
        {
            var tasks = GrabCurrentListOfTasks();
            foreach (var task in tasks)
            {
                collection.Add(task);
    
                await Task.Delay(someShortDelay);
            }
        }
    });
    
    Parallel.ForEach(collection.GetConsumingPartitioner(), task => DoWorkForTask(task));
    

  • 这里有一个你可以尝试的方法的例子。 我认为你希望摆脱Parallel.ForEach ing并用异步编程来做些事情,而不是因为你需要在结束时检索结果,而不是以可能包含长时间运行的任务和非常快速完成的任务的离散块。

    该方法使用简单的顺序循环从异步任务列表中检索结果。 在这种情况下,使用简单的非线程安全的可变列表应该很安全,因为列表中的所有变化都是在同一个线程中按顺序发生的。

    请注意,此方法在循环中使用Task.WhenAny ,对于大型任务列表而言效率不高,因此在此情况下应考虑替代方法。 (请参阅此博客:http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx)

    此示例基于:https://msdn.microsoft.com/en-GB/library/jj155756.aspx

    private async Task<ProcessResult> processTask(ProcessTask task) 
    {
        // do something intensive with data
    }
    
    private IEnumerable<ProcessTask> GetOutstandingTasks() 
    {
        // retreive some tasks from db
    }
    
    private void ProcessAllData()
    {
        List<Task<ProcessResult>> taskQueue = 
            GetOutstandingTasks()
            .Select(tsk => processTask(tsk))
            .ToList(); // grab initial task queue
    
        while(taskQueue.Any()) // iterate while tasks need completing
        {
            Task<ProcessResult> firstFinishedTask = await Task.WhenAny(taskQueue); // get first to finish
            taskQueue.Remove(firstFinishedTask); // remove the one that finished
            ProcessResult result = await firstFinishedTask; // get the result
            // do something with task result
            taskQueue.AddRange(GetOutstandingTasks().Select(tsk => processData(tsk))) // add more tasks that need performing
        }
    }
    
    链接地址: http://www.djcxy.com/p/8839.html

    上一篇: Adding items to the collection being iterated over, or equivalent?

    下一篇: Pipeline pattern using in parallel ForEach C#