Search code examples
multithreadinghaskellstm

Pipeline-like operation using TChan


I want to implement a pipeline between two threads. I have thread A that take the data, process it, and send it to thread B. I have a MVar that check if the data is completely processed

However, I'm having an exception *** Exception: thread blocked indefinitely in an STM transaction

Why are my threads blocked? I though than when the first thread write on the channel, then when there is a data on the channel, the second one can read it

fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
    ( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()

pipelineDone channel mIn = do
    isDone <- fmap isJust $ tryTakeMVar mIn
    isEmpty <- atomically $ isEmptyTChan channel
    return $ isDone && isEmpty

lastPipe f chIn mIn = iter 
    where iter = do
        atomically $ fmap f $ readTChan chIn
        isDone <- pipelineDone chIn mIn
        unless isDone $ iter

pipeline = do
    chIn <- atomically newTChan
    m <- newEmptyMVar
    first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
    last <- async $ lastPipe print chIn m
    wait first
    wait last

Solution

  • It seems odd to me to be using STM and semaphores in the same code block... Why not do the entire thing in STM?

    In particular, why not a TChan (Maybe x), with Nothing indicating the end of the sequence?

    Also, notice that your fstPipe likely just generates a bunch of unevaluated thunks and immediately chucks them into the TChan, without actually computing anything. You probably want a seq or similar in there to force some actual work to happen on that thread.