Search code examples
scalascalazscalaz-stream

Scalaz-stream chunking UP to N


Given a queue like so:

val queue: Queue[Int] = async.boundedQueue[Int](1000)

I want to pull off this queue and stream it into a downstream Sink, in chunks of UP to 100.

queue.dequeue.chunk(100).to(downstreamConsumer) 

works sort of, but it will not empty the queue if I have say 101 messages. There will be 1 message left over, unless another 99 are pushed in. I want to take as many as I can off the queue up to 100, as fast as my downstream process can handle.

Is there an existing combinator available?


Solution

  • I actually solved this a different way then I had intended.

    The scalaz-stream queue now contains a dequeueBatch method that allows dequeuing all values in the queue, up to N, or blocks.

    https://github.com/scalaz/scalaz-stream/issues/338