scalascala-cats

parEvalMap with buffering


I'm interested in a version of parEvalMap with a bounded buffer so that it will keep progressing the underlying stream as long as its buffer is not full.

What I mean is onsidering a stream of values [1,2,3,4,5,6,7,8...] with a parallelism of 4 and a buffer size of 6 would mean:

  • Start evaluating items 1,2,3,4 in parallel.
  • When the inner IO action for item 1 returns, push the resutl on the output, buffer remains empty, start processing item 5
  • When the inner IO action for item 3 returns, put the result in the buffer (3->result), and start processing item 6
  • When the inner IO action for item 5 returns, put the result in the buffer (3->f(3), 5->f(5)), start processing item 7
  • When the inner IO action for item 2 returns, push f(2) and f(3) on the output, buffer is 5-> f(5), start processing item 8
  • Whenever the buffer reaches its size, processing parallelism decreases until the first item being processed returns and/or the batch size decreases.

Any idea if there is such a combinator, or what would be a good approach to implement it?

The problem we are trying to solve is to be able to process a stream of messages/items where the processing of some items take a lot longer than the average. With normal parEvalMap this slows down processing because once we have a slow request running and then even though the next N-1 requests have already returned, parEvalMap will not start processing the next request because it is bounded by its paralellism, and can't continue because it's waiting for the result of the "slow" item.

I'd like to be able to save the 2,3,4,..n results in a buffer, and continue with max paralellism, and emit all results once the slow request completes.

With parEvalMapUnordered the maximum number of paralellism is kept because the results are returned out-of-order, but with parEvalMap a slow request would halt the processing. I'd like to lift this restriction by introducing a result buffer.


Solution

  • I wound up rolling my own, using a Semaphore (to control buffer size),zipWithIndex to have the ability to restore ordering of results after executing them with parEvalMapUnordered to execute without stalling in case of outliers, and afterwards collecting the results into a HashMap[Long, Result] in scanChunks. I'm not sure if I can share code publicly, I'll try to get permission and do so.