Search code examples
scalaakkaakka-stream

Is `BoundedSourceQueue` from `Source.queue` ok with concurrent producers?


Source.queue recently added an overload which specializes to OverflowStrategy.dropNew and avoids the async mechanism. The result of materializing this is a BoundedSourceQueue[T] (compared to SourceQueueWithComplete[T] in the older version). The docs for the SourceQueueWithComplete variants of Source.queue make it clear that the materialized queues should be used by any number of concurrent producers:

The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers.

The docs for the BoundedSourceQueue don't say anything about this. Is this constraint lifted for BoundedSourceQueue? Can it be used by any number of concurrent producers?


Solution

  • Technically, the SourceQueueWithComplete variant doesn't have the maxConcurrentOffers restriction if OverflowStrategy.dropNew is in effect.

    However, because the result of offering an element to the SourceQueueWithComplete is communicated asynchronously, that does mean that if a producer produces faster than it handles the future, it may overwhelm memory. Asynchrony removes backpressure, unless some other mechanism reintroduces it, after all.

    Because when the strategy is dropNew, it's possible to know immediately that the element was dropped, the result of offering can be communicated synchronously (i.e. blocking the producer until it handles/throws away the result). This allows there to be arbitrarily many producers without OOM risk. For this reason if using the dropNew strategy, the BoundedSourceQueue version is recommended (i.e. only use the SourceQueueWithComplete if some other strategy is being used), with the recommendation becoming stronger as load becomes higher.

    Yes, the number of running threads is the limit for the number of concurrent producers to the BoundedSourceQueue variant.