在Haskell程序中收集暂停时间

我们正在开发一个接收和转发“消息”的程序,同时保留这些消息的临时记录,以便它可以在需要时告诉您消息历史记录。 消息以数字形式标识,通常大小约为1千字节,我们需要保留数以十万计的这些消息。

我们希望优化此程序的延迟时间:发送和接收消息之间的时间必须低于10毫秒。

该程序用Haskell编写,并与GHC一起编译。 但是,我们发现垃圾收集暂停对于我们的延迟要求来说太长了:在我们的真实世界程序中超过100毫秒。

以下程序是我们应用程序的简化版本。 它使用Data.Map.Strict来存储消息。 消息是由Int标识的ByteString 。 按增加的数字顺序插入1,000,000条消息,并且不断删除最旧的消息以使历史记录最多保留200,000条消息。

module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

我们编译并运行这个程序使用:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed

这里的重要指标是“最大暂停”0.0515秒或51毫秒。 我们希望减少至少一个数量级。

实验表明,GC暂停的长度取决于历史记录中的消息数量。 这种关系大致是线性的,或者可能是超线性的。 下表显示了这种关系。 (你可以在这里看到我们的基准测试,以及一些图表。)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

我们已经尝试了其他几个变量来找出它们是否可以减少这种延迟,但没有一个会产生很大的变化。 在这些不重要的变量中:优化( -O-O2 ); RTS GC选项( -G-H-A-c ),核心数量( -N ),不同数据结构( Data.Sequence ),消息大小以及生成的短期垃圾数量。 压倒性的决定因素是历史中的消息数量。

我们的工作理论是,消息的数量暂停是线性的,因为每个GC循环必须遍历所有可用的可访问内存并复制它,这显然是线性操作。

问题:

  • 这个线性时间理论是否正确? GC暂停的长度能以这种简单的方式表达出来,还是现实更复杂?
  • 如果GC暂停在工作记忆中是线性的,有什么办法来减少涉及的常量因素?
  • 增量GC有什么选择吗? 我们只能看到研究论文。 我们非常愿意为低延迟交易吞吐量。
  • 除了分成多个进程之外,是否有任何方法可以为更小的GC循环“分区”内存?

  • 实际上,如果有51毫秒的暂停时间,超过200Mb的实时数据,它确实做得很好。 我工作的系统具有较大的最大暂停时间,只有这一数量的实时数据。

    你的假设是正确的,主要的GC暂停时间与实时数据量成正比,不幸的是,GHC现在还没有办法解决这个问题。 我们过去一直试用增量GC,但这是一个研究项目,没有达到将其折叠到已发布的GHC所需的成熟度水平。

    我们希望在未来有一件事可以帮助解决这个问题:压缩区域:https://phabricator.haskell.org/D1264。 这是一种手动内存管理,您可以在堆中压缩结构,并且GC不必遍历它。 它最适合长寿命的数据,但也许它会足够好用于设置中的单个消息。 我们的目标是在GHC 8.2.0中。

    如果您处于分布式设置并且有某种类型的负载平衡器,则可以使用以避免暂停命中的技巧,但您基本上确保负载平衡器不会将请求发送到即将到来的计算机做一个主要的GC,当然确保机器仍然完成GC,即使它没有得到请求。


    我已经使用IOVector作为基础数据结构,使用IOVector缓冲区方法尝试了您的代码片段。 在我的系统上(GHC 7.10.3,相同的编译选项),这导致最大时间(您在OP中提到的度量)减少了〜22%。

    NB。 我在这里做了两个假设:

  • 一个可变的数据结构是适合这个问题的(我猜消息传递意味着IO无论如何)
  • 你的messageId是连续的
  • 通过一些额外的Int参数和算术(例如,当messageId被重置为0或minBound ),应该很直接地确定某个消息是否仍在历史记录中,并从环缓冲区中检索相应的索引。

    为了您的测试愉悦:

    import qualified Control.Exception as Exception
    import qualified Control.Monad as Monad
    import qualified Data.ByteString as ByteString
    import qualified Data.Map.Strict as Map
    
    import qualified Data.Vector.Mutable as Vector
    
    data Msg = Msg !Int !ByteString.ByteString
    
    type Chan = Map.Map Int ByteString.ByteString
    
    data Chan2 = Chan2
        { next          :: !Int
        , maxId         :: !Int
        , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
        }
    
    chanSize :: Int
    chanSize = 200000
    
    message :: Int -> Msg
    message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
    
    
    newChan2 :: IO Chan2
    newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize
    
    pushMsg2 :: Chan2 -> Msg -> IO Chan2
    pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
        let ix' = if ix == chanSize then 0 else ix + 1
        in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)
    
    pushMsg :: Chan -> Msg -> IO Chan
    pushMsg chan (Msg msgId msgContent) =
      Exception.evaluate $
        let
          inserted = Map.insert msgId msgContent chan
        in
          if chanSize < Map.size inserted
          then Map.deleteMin inserted
          else inserted
    
    main, main1, main2 :: IO ()
    
    main = main2
    
    main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
    
    main2 = newChan2 >>= c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
    

    我必须同意其他观点 - 如果您有严格的实时限制,那么使用GC语言并不理想。

    但是,您可能会考虑尝试其他可用的数据结构,而不仅仅是Data.Map。

    我使用Data.Sequence重写了它,并得到了一些有希望的改进:

    msgs history length  max GC pause (ms)
    ===================  =================
    12500                              0.7
    25000                              1.4
    50000                              2.8
    100000                             5.4
    200000                            10.9
    400000                            21.8
    800000                            46
    1600000                           87
    3200000                          175
    6400000                          350
    

    即使您正在优化延迟,我也注意到其他指标也在改善。 在200000的情况下,执行时间从1.5秒降至0.2秒,总内存使用量从600MB降至27MB。

    我应该注意到我通过调整设计来欺骗我:

  • 我从Msg删除了Int ,所以它不在两个地方。
  • 而不是使用从地图的Int s到ByteString S,我用了一个SequenceByteString s,而不是一个Int每封邮件,我认为它可以与一个做Int整个Sequence 。 假设消息无法重新排序,您可以使用单个偏移量来将您想要的消息转换为队列中的位置。
  • (我包含一个额外的函数getMsg来证明这一点。)

    {-# LANGUAGE BangPatterns #-}
    
    import qualified Control.Exception as Exception
    import qualified Control.Monad as Monad
    import qualified Data.ByteString as ByteString
    import Data.Sequence as S
    
    newtype Msg = Msg ByteString.ByteString
    
    data Chan = Chan Int (Seq ByteString.ByteString)
    
    message :: Int -> Msg
    message n = Msg (ByteString.replicate 1024 (fromIntegral n))
    
    maxSize :: Int
    maxSize = 200000
    
    pushMsg :: Chan -> Msg -> IO Chan
    pushMsg (Chan !offset sq) (Msg msgContent) =
        Exception.evaluate $
            let newSize = 1 + S.length sq
                newSq = sq |> msgContent
            in
            if newSize <= maxSize
                then Chan offset newSq
                else
                    case S.viewl newSq of
                        (_ :< newSq') -> Chan (offset+1) newSq'
                        S.EmptyL -> error "Can't happen"
    
    getMsg :: Chan -> Int -> Maybe Msg
    getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
        where
        getMsg' i
            | i < 0            = Nothing
            | i >= S.length sq = Nothing
            | otherwise        = Just (Msg (S.index sq i))
    
    main :: IO ()
    main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
    
    链接地址: http://www.djcxy.com/p/33193.html

    上一篇: collection pause time in a Haskell program

    下一篇: Library ghc RULES don't activate