Search code examples
haskellhaskell-pipes

How can I write a pipe that sends downstream a list of what it receives from upstream?


I'm having a hard time to write a pipe with this signature:

toOneBigList :: (Monad m, Proxy p) => () -> Pipe p a [a] m r

It should simply take all as from upstream and send them in a list downstream.

All my attempts look fundamentally broken.

Can anybody point me in the right direction?


Solution

  • There are two pipes-based solutions and I'll let you pick which one you prefer.

    Note: It's not clear why you output the list on the downstream interface instead of just returning it directly as a result.

    Conduit-style

    The first one, which is very close to the conduit-based solution uses the upcoming pipes-pase, which is basically complete and just needs documentation. You can find the latest draft on Github.

    Using pipes-parse, the solution is identical to the conduit solution that Petr gave:

    import Control.Proxy
    import Control.Proxy.Parse
    
    combine
        :: (Monad m, Proxy p)
        => () -> Pipe (StateP [Maybe a] p) (Maybe a) [a] m ()
    combine () = loop []
      where
        loop as = do
            ma <- draw
            case ma of
                Nothing -> respond (reverse as)
                Just a  -> loop (a:as)
    

    draw is like conduit's await: it requests a value from either the leftovers buffer (that's the StateP part) or from upstream if the buffer is empty. Nothing indicates end of file.

    You can wrap a pipe that does not have an end of file signal using the wrap function from pipes-parse, which has type:

    wrap :: (Monad m, Proxy p) => p a' a b' b m r -> p a' a b' (Maybe b) m s
    

    Classic Pipes Style

    The second alternative is a bit simpler. If you want to fold a given pipe you can do so directly using WriterP:

    import Control.Proxy
    import Control.Proxy.Trans.Writer
    
    foldIt
      :: (Monad m, Proxy p) =>
         (() -> Pipe p a b m ()) -> () -> Pipe p a [b] m ()
    foldIt p () = runIdentityP $ do
        r <- execWriterK (liftP . p >-> toListD >-> unitU) ()
        respond r
    

    That's a higher-level description of what is going on, but it requires passing in the pipe as an explicit argument. It's up to you which one you prefer.

    By the way, this is why I was asking why you want to send a single value downstream. The above is much simpler if you return the folded list:

    foldIt p = execWriterK (liftP . p >-> toListD)
    

    The liftP might not even be necessary if p is completely polymorphic in its proxy type. I only include it as a precaution.

    Bonus Solution

    The reason pipes-parse does not provide the toOneBigList is that it's always a pipes anti-pattern to group the results into a list. pipes has several nice features that make it possible to never have to group the input into a list, even if you are trying to yield multiple lists. For example, using respond composition you can have a proxy yield the subset of the stream it would have traversed and then inject a handler that uses that subset:

    example :: (Monad m, Proxy p) => () -> Pipe p a (() -> Pipe p a a m ()) m r
    example () = runIdentityP $ forever $ do
        respond $ \() -> runIdentityP $ replicateM_ 3 $ request () >>= respond
    
    printIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
    printIt () = runIdentityP $ do
        lift $ putStrLn "Here we go!"
        printD ()
    
    useIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
    useIt = example />/ (\p -> (p >-> printIt) ())
    

    Here's an example of how to use it:

    >>> runProxy $ enumFromToS 1 10 >-> useIt
    Here we go!
    1
    2
    3
    Here we go!
    4
    5
    6
    Here we go!
    7
    8
    9
    Here we go!
    10
    

    This means you never need to bring a single element into memory even when you need to group elements.