STM对于某些TVar具有部分原子性
我正在做STM的工作,除此之外还使用TBQueue
数据结构取得了巨大成功。 我一直在使用它的一个有用的功能涉及基于TVar
的前提条件来读取它,基本上如此:
shouldRead <- readTVar shouldReadVar
if shouldRead
then do
a <- readTBQueue queue
doSomethingWith a
else doSomethingElse
如果我们在执行此块之前假设queue
为空且shouldReadVar
包含True
,则会导致readTBQueue
调用retry
,并且当shouldReadVar
包含False
或queue
包含元素时(无论先发生什么),块将被重新执行。
我现在需要一个同步通道数据结构,类似于本文中描述的结构(请阅读它,如果你想了解这个问题),除了它需要像前面的例子那样用一个前置条件来读取,并可能与其他内容合作。
我们用writeSyncChan
和readSyncChan
操作来定义数据结构SyncChan
。
这里有一个可能的用例:这个(伪)代码(因为我混合使用STM / IO概念,这将不起作用):
shouldRead <- readTVar shouldReadVar
if shouldRead
then do
a <- readSyncChan syncChan
doSomethingWith a
else doSomethingElse
假设当前没有其他线程在writeSyncChan
调用上被阻塞,并且shouldReadChan
包含True
,我希望该块“ retry
”,直到shouldReadChan
包含False
或不同的线程阻塞writeSyncChan
。 换句话说:当一个线程在writeSyncChan
上retry
并且另一个线程块到达readSyncChan
,反之亦然,我希望该值沿着通道传输。 在所有其他情况下,双方都应该处于retry
状态,因此shouldReadVar
的更改,以便读取或写入可以被取消。
在上面使用两个( T
) MVar
链接的文章中描述的天真方法当然是不可能的。 因为数据结构是同步的,所以不可能在两个atomically
块内使用它,因为您无法更改一个TMVar
并等待另一个TMVar
在原子上下文中被更改。
相反,我正在寻找一种部分原子性,我可以在其中“提交”某个事务的某个部分,并且只在某些变量发生变化时将其回滚,而不是其他变量。 如果我在上面的文章中有第一个例子的“msg”和“ack”变量,我希望能够写入“msg”变量,然后等待一个值到达“ack”,或者为我的其他事务变量发生变化。 如果其他事务变量发生变化,则应该重试整个原子块,并且如果“ack”值到达,则事务应该像以前的状态一样继续。 对于阅读方面,应该会发生类似的情况,除非我会从“msg”读取并写入“ack”。
这可能使用GHC STM,还是我需要做手动MVar /回滚处理?
这是你想要的:
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
data SyncChan a = SyncChan (TMVar a) (TMVar ())
newSyncChan :: IO (SyncChan a)
newSyncChan = do
msg <- newEmptyTMVarIO
ack <- newEmptyTMVarIO
return (SyncChan msg ack)
readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
b <- readTVar shouldReadVar
if b
then do
a <- takeTMVar msg
putTMVar ack ()
return (Just a)
else return Nothing
write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
atomically $ putTMVar msg a
atomically $ takeTMVar ack
main = do
sc <- newSyncChan
tv <- newTVarIO True
forkIO $ forever $ forM_ [False, True] $ b -> do
threadDelay 2000000
atomically $ writeTVar tv b
forkIO $ forM_ [0..] $ i -> do
putStrLn "Writing..."
write sc i
putStrLn "Write Complete"
threadDelay 300000
forever $ do
putStrLn "Reading..."
a <- atomically $ readIf sc tv
print a
putStrLn "Read Complete"
这给出了你想要的行为。 当TVar
为True
,输入和输出结束将彼此同步。 当TVar
切换到False
,读取结束自由中止并返回Nothing
。