Using the pipes library, I want to write a program to read data from some source and accumulate it monoidally (say, with Sum
). The easiest way to do this would be,
import Control.Proxy as
import Data.Monoid (Sum)
main = do
let source = enumFromToS (0::Int) 5
a <- runWriterT $ runProxy $ source >-> foldD Sum
print a
Of course, while this works for small sources, large inputs will result in the dreaded stack overflow due to the lazy nature of WriterT
's accumulator.
Thankfully, it seems that pipes
anticipates this, providing a WriterP
proxy with a strict accumulator. Unfortunately, the documentation surrounding this proxy is pretty sparse. After a bit of poking around (and simplifying the problem to instead accumulate a 1 for each downstream element), I came to this program,
import Control.Proxy
import Control.Proxy.Trans.Writer
import Data.Monoid (Sum)
main = do
let source = enumFromToS (0::Int) 5
a <- runProxy $ runWriterK $ source >-> \x->tell (Sum 1::Sum Int)
print a
Of course, this program doesn't even perform the simplified task correctly as it accumulates to 1 instead of 6. If I'm not mistaken, this behavior is explained by the fact that the pipe only reads one element before terminating. To repeat until the end of the input, I came up with the following,
import Control.Proxy
import Control.Proxy.Trans.Writer
import Data.Monoid (Sum)
main = do
let source = enumFromToS (0::Int) 5
a <- runProxy $ runWriterK $ source >-> fold Sum
print a
fold :: (Monad m, Proxy p, Monoid w) => (a -> w) -> a' -> WriterP w p a' a a' a m r
fold f = go
where go x = do a <- request x
tell $ f a
x' <- respond a
go x'
This code, however, returns an accumulator of 0. Why is this? Is there a function like my fold
provided in pipes
Given that many use-cases for pipes
are long-running processes working with large data-sets, would it not make sense for the folds in Control.Proxy.Prelude
to be built around the strict WriterP
instead of WriterT
? Currently it feels like the proxy transformers in pipes
are second-class citizens, present but lacking many of the combinators that make WriterT
so handy.
I'm adding a new answer because I've fixed this issue in pipes-3.3
, which I just uploaded to Hackage. The theory behind pipes shows that the global behavior you were expecting was the right behavior all along, and WriterP
now behaves globally so you can fold within a pipe.
I've modified your example to show you would implement it using pipes-3.3
import Control.Proxy
import Control.Proxy.Trans.Writer
main = do
let source = enumFromToS (0::Int) 5
a <- runProxy $ execWriterK $ source >-> sumD
print a
You can also now retrieve the results of a fold within a pipeline. For example, this is perfectly valid:
chunksOf :: (Monad m, Proxy p) => Int -> () -> Pipe p a [a] m r
chunksOf n () = runIdentityP $ forever $ do
-- The unitU discards the values that 'toListD' reforwards
as <- execWriterK (takeB_ n >-> toListD >-> unitU) ()
respond as
Here's an example usage:
>>> runProxy $ getLineS >-> takeWhileD (/= "quit") >-> chunksOf 3 >-> printD
Sorry for getting the answer wrong the first time!