C#平行foreach同样完成任务

我使用C#Parallel.ForEach来处理超过数千个数据子集。 一套需要5-30分钟才能处理,具体取决于套装的尺寸。 在我的电脑上有选项

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount

我会得到8个并行进程。 据我了解,流程在平行任务之间平均分配(例如,第一项任务获得1,9,17等职位,第二项获得2,10,18等); 因此,一项任务可以比其他任务更早完成自己的工作。 因为这些数据集花费的时间少于其他数据。

问题在于四个并行任务在24小时内完成他们的工作,但最后一个任务在48小时内完成。 有没有机会组织并行处理,以便所有并行任务都完成平等? 这意味着所有并行任务都可以继续工作直到所有工作完成?


由于作业不相同,因此无法在处理器之间拆分作业数量,并在大约同一时间完成作业。 我认为你需要的是8个工作线程检索下一个工作。 你将不得不在函数上使用锁来获得下一份工作。

如果我错了,有人纠正我,但我的头顶上......工人线程可以被赋予这样的功能:

public void ProcessJob()
{
    for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
    {
        // process job
    }
}

下一份工作的功能如下所示:

private List<Job> jobs;
private int currentJob = 0;

private Job GetNextJob()
{
    lock (jobs)
    {
        Job job = null;
        if (currentJob < jobs.Count)
        {
            job = jobs[currentJob];
            currentJob++;
        }
        return job;
    }
}

看来没有现成的解决方案,它必须被创建。

我之前的代码是:

var ListOfSets = (from x in Database
           group x by x.SetID into z
           select new { ID = z.Key}).ToList();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;

Parallel.ForEach(ListOfSets, po, SingleSet=>
{
     AnalyzeSet(SingleSet.ID);
});

为了在所有CPU-s之间平均分担工作,我仍然使用Parallel来完成这项工作,但不是ForEach ,而是使用For和Matt的想法。 新代码是:

Parallel.For(0, Environment.ProcessorCount, i=>
{
    while(ListOfSets.Count() > 0)
    {
        double SetID = 0;
        lock (ListOfSets)
        {
            SetID = ListOfSets[0].ID;
            ListOfSets.RemoveAt(0);
        }
     AnalyzeSet(SetID);
    }
});

所以,谢谢你的建议。


正如其他人所建议的一样,管理你自己的生产者消费者队列。 我想说明的是,使用BlockingCollection使得这非常容易。

BlockingCollection<JobData> queue = new BlockingCollection<JobData>();

//add data to queue; if it can be done quickly, just do it inline.  
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
    queue.Add(job);

queue.CompleteAdding();

for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var job in queue.GetConsumingEnumerable())
        {
            ProcessJob(job);
        }
    }, TaskCreationOptions.LongRunning);
}
链接地址: http://www.djcxy.com/p/50153.html

上一篇: C# parallel foreach equally finishing tasks

下一篇: Can the .NET 4 Task Parallel Library use COM objects?