Parallelism on divide & conquer algorithm
I'm facing problems to make my code runs in parallel. It is a 3D Delaunay generator using a divide & conquer algorithm named DeWall.
The main function is:
deWall::[SimplexPointer] -> SetSimplexFace -> Box -> StateT DeWallSets IO ([Simplex], [Edge])
deWall p afl box = do
...
...
get >>= recursion box1 box2 p1 p2 sigma edges
...
...
It calls the "recursion" function that might call the dewall function back. And it is here where the parallization opportunity appears. The following code shows the sequential solution.
recursion::Box -> Box -> [SimplexPointer] -> [SimplexPointer] -> [Simplex] -> [Edge] -> DeWallSets -> StateT DeWallSets IO ([Simplex], [Edge])
recursion box1 box2 p1 p2 sigma edges deWallSet
| null afl1 && null afl2 = return (sigma, edges)
| (null) afl1 = do
(s, e) <- deWall p2 afl2 box2
return (s ++ sigma, e ++ edges)
| (null) afl2 = do
(s,e) <- deWall p1 afl1 box1
return (s ++ sigma, e ++ edges)
| otherwise = do
x <- get
liftIO $ do
(s1, e1) <- evalStateT (deWall p1 afl1 box1) x
(s2, e2) <- evalStateT (deWall p2 afl2 box2) x
return (s1 ++ s2 ++ sigma, e1 ++ e2 ++ edges)
where afl1 = aflBox1 deWallSet
afl2 = aflBox2 deWallSet
State and IO monads are used to pipe the state and to generate UID for each tetrahedron found using MVar's. My first attempt was to add a forkIO but it doesn't work. It gives a wrong output due a lack of control during the merge part that doesn't wait for both threads to finish. I don't know how to make it wait for them.
liftIO $ do
let
s1 = evalStateT (deWall p1 afl1 box1) x
s2 = evalStateT (deWall p2 afl2 box2) x
concatThread var (a1, b1) = takeMVar var >>= (a2, b2) -> putMVar var (a1 ++ a2, b1 ++ b2)
mv <- newMVar ([],[])
forkIO (s1 >>= concatThread mv)
forkIO (s2 >>= concatThread mv)
takeMVar mv >>= (s, e) -> return (s ++ sigma, e ++ edges)
So, my next attempt was to use a better parallel strategy "par" and "pseq" which gives the right result but no parallel execution according to threadScope.
liftIO $ do
let
s1 = evalStateT (deWall p1 afl1 box1) x
s2 = evalStateT (deWall p2 afl2 box2) x
conc = liftM2 ((a1, b1) (a2, b2) -> (a1 ++ a2, b1 ++ b2))
(stotal, etotal) = s1 `par` (s2 `pseq` (s1 `conc` s2))
return (stotal ++ sigma, etotal ++ edges)
What am I doing wrong?
UPDATE : Somehow this problem seems to be related with the presence of IO monads. In an other (old) version with no IO monad, only State monad, the parallel execution runs with 'par'
and 'pseq'
. The GHC -sstderr gives SPARKS: 1160 (69 converted, 1069 pruned)
.
recursion::Box -> Box -> [SimplexPointer] -> [SimplexPointer] -> [Simplex] -> [Edge] -> DeWallSets -> State DeWallSets ([Simplex], [Edge])
recursion p1 p2 sigma deWallSet
| null afl1 && null afl2 = return sigma
| (null) afl1 = do
s <- deWall p2 afl2 box2
return (s ++ sigma)
| (null) afl2 = do
s <- deWall p1 afl1 box1
return (s ++ sigma)
| otherwise = do
x <- get
let s1 = evalState (deWall p1 afl1 box1) x
let s2 = evalState (deWall p2 afl2 box2) x
return $ s1 `par` (s2 `pseq` (s1 ++ s2 ++ sigma))
where afl1 = aflBox1 deWallSet
afl2 = aflBox2 deWallSet
Cloud someone explain that?
The easiest way to make this work would be use something like:
liftIO $ do
let
s1 = evalStateT (deWall p1 afl1 box1) x
s2 = evalStateT (deWall p2 afl2 box2) x
mv1 <- newMVar ([],[])
mv2 <- newMVar ([],[])
forkIO (s1 >>= putMVar mv1)
forkIO (s2 >>= putMVar mv2)
(a1,b1) <- takeMVar mv1
(a2,b2) <- takeMVar mv2
return (a1++a2++sigma, b1++b2++edges)
This works, but there's some unnecessary overhead. A better solution is:
liftIO $ do
let
s1 = evalStateT (deWall p1 afl1 box1) x
s2 = evalStateT (deWall p2 afl2 box2) x
mv <- newMVar ([],[])
forkIO (s2 >>= putMVar mv2)
(a1,b1) <- s1
(a2,b2) <- takeMVar mv2
return (a1++a2++sigma, b1++b2++edges)
Or possible this if the results aren't being evaluated where you'd like them to be:
liftIO $ do
let
s1 = evalStateT (deWall p1 afl1 box1) x
s2 = evalStateT (deWall p2 afl2 box2) x
mv <- newMVar ([],[])
forkIO (s2 >>= evaluate >>= putMVar mv2)
(a1,b1) <- s1
(a2,b2) <- takeMVar mv2
return (a1++a2++sigma, b1++b2++edges)
(these are answers that I gave to the poster in #haskell that I thought would be useful here as well)
Edit: removed unnecessary evaluate.
Use of par
and pseq
should occur on the "execution path", ie, not inside a local let
. Try this (modify your last snippet)
let s1 = ...
s2 = ...
conc = ...
case s1 `par` (s2 `pseq` (s1 `conc` s2)) of
(stotal, etotal) ->
return (stotal ++ sigma, etotal ++ edges)
A case
forces evaluation of its argument to weak head normal form (WHNF) before continuing in one of its branches. WHNF means that the argument is evaluated until the outermost constructor is visible. Fields may still be unevaluated.
To force full evaluation of an argument use deepseq
. Be careful with that, though, because deepseq
can sometimes make things slower by doing too much work.
A more lightweight approach to adding strictness is to make fields strict:
data Foo = Foo !Int String
Now, whenever a value of type Foo
is evaluated to WHNF, so is its first argument (but not the second one).
If you want to stick with explicit threads, rather than pseq, as you noted, you need some way to wait for the worker threads to end. That's a great use-case for a quantity semaphore. After you divide up the work to be done, have each worker thread, on termination, signal the semaphore with how much work it has done.
Then wait for all the units of work to be completed.
http://www.haskell.org/ghc/docs/6.8.3/html/libraries/base/Control-Concurrent-QSemN.html
Edit: some pseudocode to help explain the notion
do
let workchunks :: [(WorkChunk, Size)]
workchunks = dividework work
let totalsize = sum $ map snd workchunks
sem <- newQSem 0
let forkworkThread (workchunk, size) = do
executeWorkChunk workchunk
signalQSem size
mapM_ forkWorkThread workchunks
waitQSem totalsize
-- now all your work is done.
链接地址: http://www.djcxy.com/p/68026.html
上一篇: 在C编程中的rand()问题?
下一篇: 分治算法的并行性