Search code examples
haskellhaskell-pipes

Writing a simple accumulator with pipes' WriterP


Using the pipes library, I want to write a program to read data from some source and accumulate it monoidally (say, with Sum). The easiest way to do this would be,

 import Control.Proxy as 
 import Data.Monoid (Sum)

 main = do
     let source = enumFromToS (0::Int) 5
     a <- runWriterT $ runProxy $ source >-> foldD Sum
     print a

Of course, while this works for small sources, large inputs will result in the dreaded stack overflow due to the lazy nature of WriterT's accumulator.

Thankfully, it seems that pipes anticipates this, providing a WriterP proxy with a strict accumulator. Unfortunately, the documentation surrounding this proxy is pretty sparse. After a bit of poking around (and simplifying the problem to instead accumulate a 1 for each downstream element), I came to this program,

import Control.Proxy
import Control.Proxy.Trans.Writer
import Data.Monoid (Sum)

main = do
    let source = enumFromToS (0::Int) 5
    a <- runProxy $ runWriterK $ source >-> \x->tell (Sum 1::Sum Int)
    print a

Of course, this program doesn't even perform the simplified task correctly as it accumulates to 1 instead of 6. If I'm not mistaken, this behavior is explained by the fact that the pipe only reads one element before terminating. To repeat until the end of the input, I came up with the following,

import Control.Proxy
import Control.Proxy.Trans.Writer
import Data.Monoid (Sum)

main = do
    let source = enumFromToS (0::Int) 5
    a <- runProxy $ runWriterK $ source >-> fold Sum
    print a

fold :: (Monad m, Proxy p, Monoid w) => (a -> w) -> a' -> WriterP w p a' a a' a m r
fold f = go
  where go x = do a <- request x
                  tell $ f a
                  x' <- respond a
                  go x'

This code, however, returns an accumulator of 0. Why is this? Is there a function like my fold provided in pipes?

Given that many use-cases for pipes are long-running processes working with large data-sets, would it not make sense for the folds in Control.Proxy.Prelude to be built around the strict WriterP instead of WriterT? Currently it feels like the proxy transformers in pipes are second-class citizens, present but lacking many of the combinators that make WriterT so handy.


Solution

  • I'm adding a new answer because I've fixed this issue in pipes-3.3, which I just uploaded to Hackage. The theory behind pipes shows that the global behavior you were expecting was the right behavior all along, and WriterP now behaves globally so you can fold within a pipe.

    I've modified your example to show you would implement it using pipes-3.3:

    import Control.Proxy
    import Control.Proxy.Trans.Writer
    
    main = do
        let source = enumFromToS (0::Int) 5
        a <- runProxy $ execWriterK $ source >-> sumD
        print a
    

    You can also now retrieve the results of a fold within a pipeline. For example, this is perfectly valid:

    chunksOf :: (Monad m, Proxy p) => Int -> () -> Pipe p a [a] m r
    chunksOf n () = runIdentityP $ forever $ do
        -- The unitU discards the values that 'toListD' reforwards
        as <- execWriterK (takeB_ n >-> toListD >-> unitU) ()
        respond as
    

    Here's an example usage:

    >>> runProxy $ getLineS >-> takeWhileD (/= "quit") >-> chunksOf 3 >-> printD
    1<Enter>
    2<Enter>
    3<Enter>
    ["1","2","3"]
    4<Enter>
    5<Enter>
    6<Enter>
    ["4","5","6"]
    quit
    

    Sorry for getting the answer wrong the first time!