Search code examples
haskellstm

Memory leak when using Control.Concurrent.STM.TBQueue


I have two threads: producer and consumer. Producer produces some (key,value) pairs and consumer inserts them into Map, wrapped into Data.IORef. I tried to use Control.Concurrent.BoundedChan for communication between producer and consumer, and it works fine (memory usage is constant), provided that I use BangPatterns where needed. The code is as follows:

{-# LANGUAGE BangPatterns #-}

module Main where

import qualified Data.Map.Strict as M
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import qualified Control.Concurrent.BoundedChan as BC
import qualified Control.Concurrent.Chan.Unagi.Bounded as UBC
import qualified Data.Text as T
import System.Random
import Data.IORef
import Control.Monad

data Item = Item !Int !Int

data SinkState = SinkState {
  myMap :: !(M.Map Int Int)
}

testBCs = do
  chan <- BC.newBoundedChan 1000

  forkIO $ source chan
  sink chan
  where
    source chan = forever $ do
      threadDelay 500
      key <- getStdRandom (randomR (1,5000))
      value <- getStdRandom (randomR (1,1000000))
      BC.writeChan chan $ Item key value

    sink chan = do
      state <- newIORef SinkState {
          myMap = M.empty
        }
      forever $ do
        (Item key value) <- BC.readChan chan
        atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))

Now, when I switch from BoundedChan to Control.Concurrent.STM.TBQueue, memory starts to leak:

testTBs = do
  chan <- atomically $ newTBQueue 1000

  forkIO $ source chan
  sink chan
  where
    source chan = forever $ do
      threadDelay 500
      key <- getStdRandom (randomR (1,5000))
      value <- getStdRandom (randomR (1,1000000))
      atomically $ writeTBQueue chan $ Item key value

    sink chan = do
      state <- newIORef SinkState {
          myMap = M.empty
        }
      forever $ do
        (Item key value) <- chan `seq` atomically $ readTBQueue chan
        atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))

And the profiling results looks like this (ran with +RTS -hd): Profiling results

So my questions are:

  1. What's going on in the second case?
  2. What are those stg_ap_2_upd_info and other symbols?
  3. How to fix leak?

Solution

  • Without digging in to the space leak itself, one solution is simply moving the consumer logic entirely into the STM. This is as simple as replacing the IORef with a TVar. But to fully take advantage of the STM the queue read and state updates should be placed into a single atomically block. Both operations will be executed in a transaction. A helpful side effect is that we've also gained exception safety.

    import Control.Concurrent
    import Control.Concurrent.STM
    import Control.Monad
    import System.Random
    
    import qualified Data.Map.Strict as M
    
    data Item = Item {-# UNPACK #-} !Int {-# UNPACK #-} !Int
    
    data SinkState = SinkState {
      myMap :: !(M.Map Int Int)
    }
    
    main :: IO ()
    main = do
      chan <- newTBQueueIO 1000
    
      forkIO . forever $ do
        threadDelay 500
        key <- getStdRandom $ randomR (1,5000)
        value <- getStdRandom $ randomR (1,1000000)
        atomically . writeTBQueue chan $ Item key value
    
      state <- newTVarIO SinkState {
          myMap = M.empty
        }
    
      forever . atomically $ do
        Item key value <- readTBQueue chan
        modifyTVar' state $ \s -> s { myMap = M.insert key value (myMap s) }