collection pause time in a Haskell program
We are developing a program which receives and forwards "messages", while keeping a temporary history of those messages, so that it can tell you the message history if requested. Messages are identified numerically, are typically around 1 kilobyte in size, and we need to keep hundreds of thousands of these messages.
We wish to optimize this program for latency: the time between sending and receiving a message must be below 10 milliseconds.
The program is written in Haskell and compiled with GHC. However, we have found that garbage collection pauses are far too long for our latency requirements: over 100 milliseconds in our real-world program.
The following program is a simplified version of our application. It uses a Data.Map.Strict
to store messages. Messages are ByteString
s identified by an Int
. 1,000,000 messages are inserted in increasing numeric order, and the oldest messages are continually removed to keep the history at a maximum of 200,000 messages.
module Main (main) where
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if 200000 < Map.size inserted
then Map.deleteMin inserted
else inserted
main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
We compiled and ran this program using:
$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
3,116,460,096 bytes allocated in the heap
385,101,600 bytes copied during GC
235,234,800 bytes maximum residency (14 sample(s))
124,137,808 bytes maximum slop
600 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s
Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s
INIT time 0.000s ( 0.000s elapsed)
MUT time 0.652s ( 0.745s elapsed)
GC time 0.417s ( 0.530s elapsed)
EXIT time 0.010s ( 0.052s elapsed)
Total time 1.079s ( 1.326s elapsed)
%GC time 38.6% (40.0% elapsed)
Alloc rate 4,780,213,353 bytes per MUT second
Productivity 61.4% of total user, 49.9% of total elapsed
The important metric here is the "max pause" of 0.0515s, or 51 milliseconds. We wish to reduce this by at least an order of magnitude.
Experimentation shows that the length of a GC pause is determined by the number of messages in the history. The relationship is roughly linear, or perhaps super-linear. The following table shows this relationship. (You can see our benchmarking tests here, and some charts here.)
msgs history length max GC pause (ms)
=================== =================
12500 3
25000 6
50000 13
100000 30
200000 56
400000 104
800000 199
1600000 487
3200000 1957
6400000 5378
We have experimented with several other variables to find whether they can reduce this latency, none of which make a big difference. Among these unimportant variables are: optimization ( -O
, -O2
); RTS GC options ( -G
, -H
, -A
, -c
), number of cores ( -N
), different data structures ( Data.Sequence
), the size of messages, and the amount of generated short-lived garbage. The overwhelming determining factor is the number of messages in the history.
Our working theory is that the pauses are linear in the number of messages because each GC cycle has to walk over all the working accessible memory and copy it, which are clearly linear operations.
Questions:
You're actually doing pretty well to have a 51ms pause time with over 200Mb of live data. The system I work on has a larger max pause time with half that amount of live data.
Your assumption is correct, the major GC pause time is directly proportional to the amount of live data, and unfortunately there's no way around that with GHC as it stands. We experimented with incremental GC in the past, but it was a research project and didn't reach the level of maturity needed to fold it into the released GHC.
One thing that we're hoping will help with this in the future is compact regions: https://phabricator.haskell.org/D1264. It's a kind of manual memory management where you compact a structure in the heap, and the GC doesn't have to traverse it. It works best for long-lived data, but perhaps it will be good enough to use for individual messages in your setting. We're aiming to have it in GHC 8.2.0.
If you're in a distributed setting and have a load-balancer of some kind there are tricks you can play to avoid taking the pause hit, you basically make sure that the load-balancer doesn't send requests to machines that are about to do a major GC, and of course make sure that the machine still completes the GC even though it isn't getting requests.
I've tried your code snippet with a ringbuffer approach using IOVector
as the underlying data structure. On my system (GHC 7.10.3, same compilation options) this resulted in a reduction of max time (the metric you mentioned in your OP) by ~22%.
NB. I made two assumptions here:
With some additional Int
parameter and arithmetic (like when messageId's are reset back to 0 or minBound
) it should then be straightforward to determine whether a certain message is still in the history and retrieve it form the corresponding index in the ringbuffer.
For your testing pleasure:
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
import qualified Data.Vector.Mutable as Vector
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
data Chan2 = Chan2
{ next :: !Int
, maxId :: !Int
, ringBuffer :: !(Vector.IOVector ByteString.ByteString)
}
chanSize :: Int
chanSize = 200000
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize
pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
let ix' = if ix == chanSize then 0 else ix + 1
in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if chanSize < Map.size inserted
then Map.deleteMin inserted
else inserted
main, main1, main2 :: IO ()
main = main2
main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
main2 = newChan2 >>= c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
I have to agree with the others - if you have hard real-time constraints, then using a GC language is not ideal.
However, you might consider experimenting with other available data structures rather than just Data.Map.
I rewrote it using Data.Sequence and got some promising improvements:
msgs history length max GC pause (ms)
=================== =================
12500 0.7
25000 1.4
50000 2.8
100000 5.4
200000 10.9
400000 21.8
800000 46
1600000 87
3200000 175
6400000 350
Even though you're optimising for latency, I noticed other metrics improving too. In the 200000 case, execution time drops from 1.5s to 0.2s, and total memory usage drops from 600MB to 27MB.
I should note that I cheated by tweaking the design:
Int
from the Msg
, so it's not in two places. Int
s to ByteString
s, I used a Sequence
of ByteString
s, and instead of one Int
per message, I think it can be done with one Int
for the whole Sequence
. Assuming messages can't get reordered, you can use a single offset to translate which message you want to where it sits in the queue. (I included an additional function getMsg
to demonstrate that.)
{-# LANGUAGE BangPatterns #-}
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S
newtype Msg = Msg ByteString.ByteString
data Chan = Chan Int (Seq ByteString.ByteString)
message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))
maxSize :: Int
maxSize = 200000
pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
Exception.evaluate $
let newSize = 1 + S.length sq
newSq = sq |> msgContent
in
if newSize <= maxSize
then Chan offset newSq
else
case S.viewl newSq of
(_ :< newSq') -> Chan (offset+1) newSq'
S.EmptyL -> error "Can't happen"
getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
where
getMsg' i
| i < 0 = Nothing
| i >= S.length sq = Nothing
| otherwise = Just (Msg (S.index sq i))
main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
链接地址: http://www.djcxy.com/p/33194.html
上一篇: Haskell性能示例
下一篇: 在Haskell程序中收集暂停时间