Adding items to the collection being iterated over, or equivalent?

Right now, I've got a C# program that performs the following steps on a recurring basis:

  • Grab current list of tasks from the database
  • Using Parallel.ForEach(), do work for each task
  • However, some of these tasks are very long-running. This delays the processing of other pending tasks because we only look for new ones at the start of the program.

    Now, I know that modifying the collection being iterated over isn't possible (right?), but is there some equivalent functionality in the C# Parallel framework that would allow me to add work to the list while also processing items in the list?


    Generally speaking, you're right that modifying a collection while iterating it is not allowed. But there are other approaches you could be using:

  • Use ActionBlock<T> from TPL Dataflow. The code could look something like:

    var actionBlock = new ActionBlock<MyTask>(
        task => DoWorkForTask(task),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    
    while (true)
    {
        var tasks = GrabCurrentListOfTasks();
        foreach (var task in tasks)
        {
            actionBlock.Post(task);
    
            await Task.Delay(someShortDelay);
            // or use Thread.Sleep() if you don't want to use async
        }
    }
    
  • Use BlockingCollection<T> , which can be modified while consuming items from it, along with GetConsumingParititioner() from ParallelExtensionsExtras to make it work with Parallel.ForEach() :

    var collection = new BlockingCollection<MyTask>();
    
    Task.Run(async () =>
    {
        while (true)
        {
            var tasks = GrabCurrentListOfTasks();
            foreach (var task in tasks)
            {
                collection.Add(task);
    
                await Task.Delay(someShortDelay);
            }
        }
    });
    
    Parallel.ForEach(collection.GetConsumingPartitioner(), task => DoWorkForTask(task));
    

  • Here is an example of an approach you could try. I think you want to get away from Parallel.ForEach ing and do something with asynchronous programming instead because you need to retrieve results as they finish, rather than in discrete chunks that could conceivably contain both long running tasks and tasks that finish very quickly.

    This approach uses a simple sequential loop to retrieve results from a list of asynchronous tasks. In this case, you should be safe to use a simple non-thread safe mutable list because all of the mutation of the list happens sequentially in the same thread.

    Note that this approach uses Task.WhenAny in a loop which isn't very efficient for large task lists and you should consider an alternative approach in that case. (See this blog: http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx)

    This example is based on: https://msdn.microsoft.com/en-GB/library/jj155756.aspx

    private async Task<ProcessResult> processTask(ProcessTask task) 
    {
        // do something intensive with data
    }
    
    private IEnumerable<ProcessTask> GetOutstandingTasks() 
    {
        // retreive some tasks from db
    }
    
    private void ProcessAllData()
    {
        List<Task<ProcessResult>> taskQueue = 
            GetOutstandingTasks()
            .Select(tsk => processTask(tsk))
            .ToList(); // grab initial task queue
    
        while(taskQueue.Any()) // iterate while tasks need completing
        {
            Task<ProcessResult> firstFinishedTask = await Task.WhenAny(taskQueue); // get first to finish
            taskQueue.Remove(firstFinishedTask); // remove the one that finished
            ProcessResult result = await firstFinishedTask; // get the result
            // do something with task result
            taskQueue.AddRange(GetOutstandingTasks().Select(tsk => processData(tsk))) // add more tasks that need performing
        }
    }
    
    链接地址: http://www.djcxy.com/p/8840.html

    上一篇: 如何在任务非常昂贵的任务中平衡并行性?

    下一篇: 将项目添加到正在迭代的集合中,还是等效?