Using Haskell-streaming, I can easily group a stream and take sum on each group.
>>> S.print $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
[-1,-2]
[3,4,5]
[-6]
>>> S.print $S.map sum $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
-3
12
-6
How to have a function myfn
that generates a stream that is a merge of the two above in an order sensitive way? I.e. I wish to have a result stream of
>>> myfn $ each [-1,-2,3,4,5,-6]
-1:> -2:> -3:> 3:> 4:> 5:> 12:> -6:> -6:> ()
The solution involves making the function argument of mapped
both accumulate the list and calculate the sum, in one pass.
That can be done with store
I think, but I find the streaming sinks from foldl easier to use. Their Applicative
instance lets us build composite Fold
s from simpler ones:
foo :: Monad m
=> (Int -> Int -> Bool)
-> Stream (Of Int) m ()
-> Stream (Of Int) m ()
foo p =
flip S.for (\(xs,total) -> S.each xs *> S.yield total)
. mapped (L.purely S.fold $ (,) <$> L.list <*> L.sum)
. S.groupBy p
Where L.purely
, L.list
and L.sum
are from "foldl".
The finishing touch is taking each pair ([Int],Int)
coming out of mapped
and replacing it with a substream using for
.
Putting it to work:
*Main> S.print $ foo (\x y -> x*y>0) $ S.each [-1,-2,3,4,5,-6]
Edit: Come to think of it, the previous solution is flawed. We are only interested in a streamed result, yet we accumulate each individual group in memory using S.toList
or L.list
before sending it downstream. But what if one group happens to be bigger than the available memory in the machine?
Here's a solution that streams perfectly and is indifferent to the size of each group:
foo :: Monad m
=> (Int -> Int -> Bool)
-> Stream (Of Int) m ()
-> Stream (Of Int) m ()
foo p =
concats
. S.maps (S.store (\s -> do (total :> r) <- L.purely S.fold L.sum s
S.yield total
return r))
. S.groupBy p
What has changed? First, we use maps
instead of mapped
, because now we want to transform the subgroup streams, instead of returning a result in the base monad.
For each subgroup stream, we use store
to perform a summing fold without destroying the stream. Then we take the result of the fold and append it back to the stream, while also taking care of preserving the original return value as required by maps
.
The only step left is to rejoin the subgroups using concats
.