Search code examples
haskellhaskell-streaming

Append a delay to each chunk with the streaming library?


newbie to Streaming and Haskell here.

I've been playing around with the streaming library and I'm particularly interested in understanding the chunks part. Eg:

S.print $ S.delay 1.0 $ concats $ chunksOf 2 $ S.each [1..10]

Or:

S.print $ concats $ S.maps (S.delay 1.0) $ chunksOf 2 $ S.each [1..10]

Here I can introduce a delay after each element but what I want is to have a delay after each chunk, in this case a delay every second element. I tried this but doesn't compile:

S.print $ concats $ S.delay 1.0 $ chunksOf 2 $ S.each [1..10]

How can I achieve this?


Solution

  • What we need is a function that inserts a single delay at the end of a chunk stream, and pass that function to maps.

    delay doesn't work here because it put delays between each yielded value. But we can do it easily using functions from Applicative:

      S.print 
    $ concats 
    $ S.maps (\s -> s <* liftIO (threadDelay 1000000)) 
    $ chunksOf 2 
    $ S.each [1..10]
    

    What is happening here? maps applies a transformation to the "base functor" of the Stream. In a "chunked stream" obtained with chunksOf, that base functor is itself a Stream. Also, the transformation must preserve the return value of the Stream.

    Streams can be sequenced with functions like (>>=) :: Stream f m a -> (a -> Stream f m b) -> Stream f m b if the next stream depends on the final result of the previous one, or with functions like (<*) :: Stream f m a -> Stream f m b -> Stream f m a if it doesn't. (<*) preserves the return value of the first Stream, which is what we want in this case.

    We do not want to yield any more elements, but only to introduce a delay effect, so we simply liftIO the effect into the Stream monad.


    Another way to insert delays after each yielded value of a Stream is to zip it with an infinite list of delays:

    delay' :: MonadIO m => Int -> Stream (Of a) m r -> Stream (Of a) m r
    delay' micros s = S.zipWith const s (S.repeatM (liftIO (threadDelay micros)))