newbie to Streaming and Haskell here.
I've been playing around with the streaming library and I'm particularly interested in understanding the chunks part. Eg:
S.print $ S.delay 1.0 $ concats $ chunksOf 2 $ S.each [1..10]
Or:
S.print $ concats $ S.maps (S.delay 1.0) $ chunksOf 2 $ S.each [1..10]
Here I can introduce a delay after each element but what I want is to have a delay after each chunk, in this case a delay every second element. I tried this but doesn't compile:
S.print $ concats $ S.delay 1.0 $ chunksOf 2 $ S.each [1..10]
How can I achieve this?
What we need is a function that inserts a single delay at the end of a chunk stream, and pass that function to maps
.
delay
doesn't work here because it put delays between each yielded value. But we can do it easily using functions from Applicative
:
S.print
$ concats
$ S.maps (\s -> s <* liftIO (threadDelay 1000000))
$ chunksOf 2
$ S.each [1..10]
What is happening here? maps
applies a transformation to the "base functor" of the Stream
. In a "chunked stream" obtained with chunksOf
, that base functor is itself a Stream
. Also, the transformation must preserve the return value of the Stream
.
Stream
s can be sequenced with functions like (>>=) :: Stream f m a -> (a -> Stream f m b) -> Stream f m b
if the next stream depends on the final result of the previous one, or with functions like (<*) :: Stream f m a -> Stream f m b -> Stream f m a
if it doesn't. (<*)
preserves the return value of the first Stream
, which is what we want in this case.
We do not want to yield any more elements, but only to introduce a delay effect, so we simply liftIO
the effect into the Stream
monad.
Another way to insert delays after each yielded value of a Stream
is to zip it with an infinite list of delays:
delay' :: MonadIO m => Int -> Stream (Of a) m r -> Stream (Of a) m r
delay' micros s = S.zipWith const s (S.repeatM (liftIO (threadDelay micros)))