I have two threads: producer and consumer. Producer produces some (key,value) pairs and consumer inserts them into Map
, wrapped into Data.IORef
. I tried to use Control.Concurrent.BoundedChan
for communication between producer and consumer, and it works fine (memory usage is constant), provided that I use BangPatterns where needed. The code is as follows:
{-# LANGUAGE BangPatterns #-}
module Main where
import qualified Data.Map.Strict as M
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import qualified Control.Concurrent.BoundedChan as BC
import qualified Control.Concurrent.Chan.Unagi.Bounded as UBC
import qualified Data.Text as T
import System.Random
import Data.IORef
import Control.Monad
data Item = Item !Int !Int
data SinkState = SinkState {
myMap :: !(M.Map Int Int)
}
testBCs = do
chan <- BC.newBoundedChan 1000
forkIO $ source chan
sink chan
where
source chan = forever $ do
threadDelay 500
key <- getStdRandom (randomR (1,5000))
value <- getStdRandom (randomR (1,1000000))
BC.writeChan chan $ Item key value
sink chan = do
state <- newIORef SinkState {
myMap = M.empty
}
forever $ do
(Item key value) <- BC.readChan chan
atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))
Now, when I switch from BoundedChan
to Control.Concurrent.STM.TBQueue
, memory starts to leak:
testTBs = do
chan <- atomically $ newTBQueue 1000
forkIO $ source chan
sink chan
where
source chan = forever $ do
threadDelay 500
key <- getStdRandom (randomR (1,5000))
value <- getStdRandom (randomR (1,1000000))
atomically $ writeTBQueue chan $ Item key value
sink chan = do
state <- newIORef SinkState {
myMap = M.empty
}
forever $ do
(Item key value) <- chan `seq` atomically $ readTBQueue chan
atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))
And the profiling results looks like this (ran with +RTS -hd
):
So my questions are:
Without digging in to the space leak itself, one solution is simply moving the consumer logic entirely into the STM. This is as simple as replacing the IORef
with a TVar
. But to fully take advantage of the STM the queue read and state updates should be placed into a single atomically
block. Both operations will be executed in a transaction. A helpful side effect is that we've also gained exception safety.
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import System.Random
import qualified Data.Map.Strict as M
data Item = Item {-# UNPACK #-} !Int {-# UNPACK #-} !Int
data SinkState = SinkState {
myMap :: !(M.Map Int Int)
}
main :: IO ()
main = do
chan <- newTBQueueIO 1000
forkIO . forever $ do
threadDelay 500
key <- getStdRandom $ randomR (1,5000)
value <- getStdRandom $ randomR (1,1000000)
atomically . writeTBQueue chan $ Item key value
state <- newTVarIO SinkState {
myMap = M.empty
}
forever . atomically $ do
Item key value <- readTBQueue chan
modifyTVar' state $ \s -> s { myMap = M.insert key value (myMap s) }