Search code examples
haskellstreaminghaskell-streaming

Haskell streaming - how to merge original stream with result stream


Using Haskell-streaming, I can easily group a stream and take sum on each group.

>>> S.print $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
[-1,-2]
[3,4,5]
[-6]

>>> S.print $S.map sum $ mapped S.toList $ S.groupBy (\ x y -> x*y>0) $ each [-1,-2,3,4,5,-6]
-3
12
-6

How to have a function myfn that generates a stream that is a merge of the two above in an order sensitive way? I.e. I wish to have a result stream of

>>> myfn $ each [-1,-2,3,4,5,-6]
-1:> -2:> -3:> 3:> 4:> 5:> 12:> -6:> -6:> ()

Solution

  • The solution involves making the function argument of mapped both accumulate the list and calculate the sum, in one pass.

    That can be done with store I think, but I find the streaming sinks from foldl easier to use. Their Applicative instance lets us build composite Folds from simpler ones:

    foo :: Monad m 
        => (Int -> Int -> Bool) 
        -> Stream (Of Int) m ()
        -> Stream (Of Int) m ()
    foo p = 
          flip S.for (\(xs,total) -> S.each xs *> S.yield total)
        . mapped (L.purely S.fold $ (,) <$> L.list <*> L.sum)
        . S.groupBy p
    

    Where L.purely, L.list and L.sum are from "foldl".

    The finishing touch is taking each pair ([Int],Int) coming out of mapped and replacing it with a substream using for.

    Putting it to work:

    *Main> S.print $ foo (\x y -> x*y>0) $ S.each [-1,-2,3,4,5,-6]
    

    Edit: Come to think of it, the previous solution is flawed. We are only interested in a streamed result, yet we accumulate each individual group in memory using S.toList or L.list before sending it downstream. But what if one group happens to be bigger than the available memory in the machine?

    Here's a solution that streams perfectly and is indifferent to the size of each group:

    foo :: Monad m 
        => (Int -> Int -> Bool) 
        -> Stream (Of Int) m ()
        -> Stream (Of Int) m ()
    foo p = 
          concats
        . S.maps (S.store (\s -> do (total :> r) <- L.purely S.fold L.sum s
                                    S.yield total
                                    return r))
        . S.groupBy p
    

    What has changed? First, we use maps instead of mapped, because now we want to transform the subgroup streams, instead of returning a result in the base monad.

    For each subgroup stream, we use store to perform a summing fold without destroying the stream. Then we take the result of the fold and append it back to the stream, while also taking care of preserving the original return value as required by maps.

    The only step left is to rejoin the subgroups using concats.