STM with partial atomicity for certain TVars
I am doing things with STM and have among other things used the TBQueue
data structure with great success. An useful feature I've been using it for involves reading from it based on a precondition in a TVar
, basically like so:
shouldRead <- readTVar shouldReadVar
if shouldRead
then do
a <- readTBQueue queue
doSomethingWith a
else doSomethingElse
If we assume that queue
is empty and shouldReadVar
contains True
before executing this block, it will result in readTBQueue
calling retry
, and the block will be re-executed when shouldReadVar
contains False
or queue
contains an element, whatever happens first.
I am now in need of a synchronous channel data structure, similar to the structure described in this article (Please read it if you want to understand this question), except it needs to be readable with a pre-condition like in the previous example, and possibly compose with other stuff as well.
Let's call this data structure SyncChan
with writeSyncChan
and readSyncChan
operations defined on it.
And here's a possible use case: This (pseudo) code (which will not work because I mix STM/IO concepts):
shouldRead <- readTVar shouldReadVar
if shouldRead
then do
a <- readSyncChan syncChan
doSomethingWith a
else doSomethingElse
Assuming that no other thread is currently blocking on a writeSyncChan
call, and shouldReadChan
contains True
, I want the block to " retry
" until either shouldReadChan
contains False
, or a different thread blocks on a writeSyncChan
. In other words: when one thread retry
s on writeSyncChan
and another thread blocks reaches a readSyncChan
, or vice versa, I want the value to be transferred along the channel. In all other cases, both sides should be in a retry
state and thus react to a change in shouldReadVar
, so that the read or write can be cancelled.
The naïve approach described in the article linked above using two ( T
) MVar
s is of course not possible. Because the data structure is synchronous, it is impossible to use it within two atomically
blocks, because you cannot change one TMVar
and wait for another TMVar
to be changed in an atomic context.
Instead, I am looking for a kind of partial atomicity, where I can "commit" a certain part of a transaction and only roll it back when certain variables change, but not others. If I have "msg" and "ack" variables like the first example in the article above, I want to be able to write to the "msg" variable, then wait for either a value to arrive on "ack", or for my other transactional variables to change. If other transactional variables change, the whole atomic block should be retried, and if an "ack" value arrives, the transaction should continue as it was in the previous state. For the reading side, something similar should happen, except I would of course be reading from "msg" and writing to "ack."
Is this possible to do using GHC STM, or do I need to do manual MVar/rollback handling?
This is what you want:
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
data SyncChan a = SyncChan (TMVar a) (TMVar ())
newSyncChan :: IO (SyncChan a)
newSyncChan = do
msg <- newEmptyTMVarIO
ack <- newEmptyTMVarIO
return (SyncChan msg ack)
readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
b <- readTVar shouldReadVar
if b
then do
a <- takeTMVar msg
putTMVar ack ()
return (Just a)
else return Nothing
write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
atomically $ putTMVar msg a
atomically $ takeTMVar ack
main = do
sc <- newSyncChan
tv <- newTVarIO True
forkIO $ forever $ forM_ [False, True] $ b -> do
threadDelay 2000000
atomically $ writeTVar tv b
forkIO $ forM_ [0..] $ i -> do
putStrLn "Writing..."
write sc i
putStrLn "Write Complete"
threadDelay 300000
forever $ do
putStrLn "Reading..."
a <- atomically $ readIf sc tv
print a
putStrLn "Read Complete"
This gives the behavior you had in mind. While the TVar
is True
the input and output ends will be synchronized with each other. When the TVar
switches to False
then the read end freely aborts and returns Nothing
.
上一篇: Google Maps API v3:如何删除所有标记?
下一篇: STM对于某些TVar具有部分原子性