C# parallel foreach equally finishing tasks
I'm using C# Parallel.ForEach to process more than thousand subsets of data. One set takes 5-30 minutes to process, depending on size of the set. In my computer with option
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount
I'll get 8 parallel processes. As I understood, processes are divided equally between parallel tasks (eg the first task gets jobs number 1,9,17 etc, the second gets 2,10,18 etc.); therefore, one task can finish own jobs sooner than others. Because those sets of data took less time than others.
The problem is that four parallel tasks finish their jobs within 24 hours, but the last one finish in 48 hours. It there some chance to organize parallelism so that all parallel tasks are finishing equally? It means all parallel tasks continue working until all jobs are done?
Since the jobs are not equal, you can't split the number of jobs between processors and have them finish at about the same time. I think what you need here is 8 worker threads that retrieve the next job in line. You will have to use a lock on the function to get the next job.
Somebody correct me if I'm wrong, but off the top of my head... a worker thread could be given a function like this:
public void ProcessJob()
{
for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
{
// process job
}
}
And the function to get the next job would look like:
private List<Job> jobs;
private int currentJob = 0;
private Job GetNextJob()
{
lock (jobs)
{
Job job = null;
if (currentJob < jobs.Count)
{
job = jobs[currentJob];
currentJob++;
}
return job;
}
}
It seems that there is no ready-to-use solution and it has to be created.
My previous code was:
var ListOfSets = (from x in Database
group x by x.SetID into z
select new { ID = z.Key}).ToList();
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;
Parallel.ForEach(ListOfSets, po, SingleSet=>
{
AnalyzeSet(SingleSet.ID);
});
To share work equally between all CPU-s, I still use Parallel
to do the work, but instead of ForEach
I use For
and an idea from Matt. The new code is:
Parallel.For(0, Environment.ProcessorCount, i=>
{
while(ListOfSets.Count() > 0)
{
double SetID = 0;
lock (ListOfSets)
{
SetID = ListOfSets[0].ID;
ListOfSets.RemoveAt(0);
}
AnalyzeSet(SetID);
}
});
So, thank you for your advice.
One option, as suggested by others, is to manage your own producer consumer queue. I'd like to note that using the BlockingCollection
makes this very easy to do.
BlockingCollection<JobData> queue = new BlockingCollection<JobData>();
//add data to queue; if it can be done quickly, just do it inline.
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
queue.Add(job);
queue.CompleteAdding();
for (int i = 0; i < Environment.ProcessorCount; i++)
{
Task.Factory.StartNew(() =>
{
foreach (var job in queue.GetConsumingEnumerable())
{
ProcessJob(job);
}
}, TaskCreationOptions.LongRunning);
}
链接地址: http://www.djcxy.com/p/50154.html
上一篇: Parallel.ForEach停止对最后几项进行平行处理
下一篇: C#平行foreach同样完成任务