How can I use TTask.WaitForAny from the new threading library?
In an attempt to use the threading library in Delphi to calculate tasks in parallel and using TTask.WaitForAny()
to get the first calculated result, an exception occationally stopped the execution.
Call stack at the exception:
First chance exception at $752D2F71. Exception class EMonitorLockException with message 'Object lock not owned'. Process Project1.exe (11248)
:752d2f71 KERNELBASE.RaiseException + 0x48
System.TMonitor.CheckOwningThread
System.ErrorAt(25,$408C70)
System.Error(reMonitorNotLocked)
System.TMonitor.CheckOwningThread
System.TMonitor.Exit
System.TMonitor.Exit($2180E40)
System.Threading.TTask.RemoveCompleteEvent(???)
System.Threading.TTask.DoWaitForAny((...),4294967295)
System.Threading.TTask.WaitForAny((...))
Project9.Parallel2
Project9.Project1
:74ff919f KERNEL32.BaseThreadInitThunk + 0xe
:7723b54f ntdll.RtlInitializeExceptionChain + 0x8f
:7723b51a ntdll.RtlInitializeExceptionChain + 0x5a
The call stack leads to the conclusion that the exception is caused by a bug in the threading library, TMonitor
and/ or TTask.WaitForAny()
. To verify that, the code was cut down to a minimum:
program Project1;
{$APPTYPE CONSOLE}
uses
System.SysUtils, System.Threading, System.Classes, System.SyncObjs,
System.StrUtils;
var
WorkerCount : integer = 1000;
function MyTaskProc: TProc;
begin
result := procedure
begin
// Do something
end;
end;
procedure Parallel2;
var
i : Integer;
Ticks: Cardinal;
tasks: array of ITask;
LTask: ITask;
workProc: TProc;
begin
workProc := MyTaskProc();
Ticks := TThread.GetTickCount;
SetLength(tasks, WorkerCount); // number of parallel tasks to undertake
for i := 0 to WorkerCount - 1 do // parallel tasks
tasks[i] := TTask.Run(workProc);
TTask.WaitForAny(tasks); // wait for the first one to finish
for LTask in tasks do
LTask.Cancel; // kill the remaining tasks
Ticks := TThread.GetTickCount - Ticks;
WriteLn('Parallel time ' + Ticks.ToString + ' ms');
end;
begin
try
repeat
Parallel2;
WriteLn('finished');
until FALSE;
except
on E: Exception do
writeln(E.ClassName, ': ', E.Message);
end;
Readln;
end.
Now the error reproduces after a while and the RTL bug is verified.
This was submitted as RSP-10197 TTask.WaitForAny gives exception EMonitorLockException "Object lock not owned" to Embarcadero.
Given the fact that this is currently not possible to solve with the Delphi threading library, the question is:
Is there a workaround to execute a procedure in parallel to get the first acquired solution?
Here is an example using TParallel.For to stop the execution when an answer is produced. It uses the TParallel.LoopState to signal other members of the parallel for loop. By using the .Stop
signal, all current and pending iterations should stop. Current iterations should check loopState.Stopped
.
procedure Parallel3(CS: TCriticalSection);
var
Ticks: Cardinal;
i,ix: Integer; // variables that are only touched once in the Parallel.For loop
begin
i := 0;
Ticks := TThread.GetTickCount;
TParallel.For(1,WorkerCount,
procedure(index:Integer; loopState: TParallel.TLoopState)
var
k,l,m: Integer;
begin
// Do something complex
k := (1000 - index)*1000;
for l := 0 to Pred(k) do
m := k div 1000;
// If criteria to stop fulfilled:
CS.Enter;
Try
if loopState.Stopped then // A solution was already found
Exit;
loopState.Stop; // Signal
Inc(i);
ix := index;
Finally
CS.Leave;
End;
end
);
Ticks := TThread.GetTickCount - Ticks;
WriteLn('Parallel time ' + Ticks.ToString + ' ticks', ' i :',i,' index:',ix);
end;
The critical section protects the calculated results, here for simplicity i,ix.
Disclaimer, given the state of bugs galore within the System.Threading
library, I would recommend another solution using the OTL framework. At least until the library has reached a stable foundation.