Search code examples
haskellhaskell-pipes

How to detect last chunk in a Haskell Pipe?


I have a small Haskell Pipe that prints out how many times it has run:

counterPipe :: Pipe String String IO r
counterPipe = go 0
  where
    go n = do
      await >>= yield
      let n' = succ n
      liftIO $ putStrLn $ "Chunk " ++ show n'
      go n'

I'd like to be able to print out a message, and potentially perform other tasks, once it has processed the last chunk. How do I go about doing this?


Solution

  • I was able to get this to work by changing counterPipe's input type to Maybe String and injecting an extra Nothing after the upstream pipe finishes:

    import Pipes
    import Pipes.Core (respond)
    import Control.Applicative ((<*))
    
    withEOF :: (Monad m) => Proxy a' a b' b m r -> Proxy a' a b' (Maybe b) m r
    withEOF p = for p (respond . Just) <* respond Nothing
    
    counterPipe :: Pipe (Maybe String) String IO Int
    counterPipe = go 0
      where
        go n = do
            mx <- await
    
            case mx of
                Just x -> do
                    yield x
                    let n' = succ n
                    liftIO $ putStrLn $ "Chunk " ++ show n'
                    go n'
                Nothing -> do
                    return n
    
    finishCounter :: Int -> Pipe a b IO ()
    finishCounter n = liftIO $ putStrLn $ unwords ["Finished after", show n, "chunks"]
    

    Example driver:

    import qualified Pipes.Prelude as P
    main = runEffect $ withEOF P.stdinLn >-> (counterPipe >>= finishCounter) >-> P.stdoutLn
    

    I think this pattern should be abstractable into something like

    whileJust :: (Monad m) => Proxy a' a b' b m r -> Proxy a' (Maybe a) b' b m (Maybe r)
    

    so you could write

    withEOF P.stdinLn >-> (whileJust counterPipe >>= maybe (return ()) finishCounter) >-> P.stdoutLn
    

    without having to change your original counterPipe definition; but I've never used Pipes before (the above solution was figured out by just looking at the types and playing type-domino) and so I haven't managed to write whileJust (the signature is probably too generic in ways that I cannot figure out).