Search code examples
haskellhaskell-pipes

Generalizing a function to merge a set of Haskell pipes Producers


I am working with the Haskell pipes package.

I am trying to use pipes-concurrency to merge a list of Producers together.

What I want to arrive at is:

merge :: MonadIO m => [Producer a m ()] -> Producer a m ()

so given a producer s1 and another producer s2: r = merge [s1, s2] which would give the behaviour:

s1 --1--1--1--|
s2 ---2---2---2|
r  --12-1-21--2|

Following the code in the tutorial page I came up with:

mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    fork :: Output a -> Producer a IO () -> IO ()
    fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
                                              performGC

which works as expected.

However I am having difficulty generalizing things.

My attempt:

merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    runEffectIO :: Monad m => Effect m r -> IO (m r)
    runEffectIO e = do
        x <- evaluate $ runEffect e
        return x
    fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
                                       performGC

Unfortunately this compiles but does not do all too much else. I am guessing that I am making a mess of runEffectIO. Other approaches to my current runEffectIO have yielded no better results.

The program:

main = do
    let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
    _ <- runEffect $ producer >-> taker 20
  where repeater :: Int -> Int -> Producer Int IO r
        repeater val delay = forever $ do
            lift $ threadDelay delay
            yield val
        taker :: Int -> Consumer Int IO ()
        taker 0 = return ()
        taker n = do
            val <- await
            liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
            taker $ n - 1

hits val <- await but does not get to liftIO $ putStrLn thus it produces no output. However it exits fine without hanging.

When I substitute in mergeIO for merge then the program runs I would expect outputting 20 lines.


Solution

  • While MonadIO is not sufficient for this operation, MonadBaseControl (from monad-control) is designed to allow embedding arbitrary transformer stacks inside the base monad. The companion package lifted-base provides a version of fork which will work for transformer stacks. I've put together an example of using it to solve your problem in the following Gist, though the main magic is:

    import qualified Control.Concurrent.Lifted as L
    fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
    fork output producer = L.fork $ do
        runEffect $ producer >-> toOutput output
        liftIO performGC
    

    Note that you should understand what happens to monadic states when treated this way: modifications to any mutable state performed in the child threads will be isolated to just those child threads. In other words, if you were using a StateT, each child thread would start off with the same state value that was in context when it was forked, but then you would have many different states that do not update each other.

    There's an appendix in the Yesod book on monad-control, though frankly it's a bit dated. I'm just not aware of any more recent tutorials.