Search code examples
scalafs2

FS2: is it possible to complete Queue gracefully?


Suppose that I want to convert some legacy asynchronous API into FS2 Streams. The API provides an interface with 3 callbacks: next element, success, error. I'd like the Stream to emit all the elements and then complete upon receiving success or error callback.

FS2 guide (https://functional-streams-for-scala.github.io/fs2/guide.html) suggests using fs2.Queue for such situations, and it works great for enqueueing, but all the examples I've seen so far expect that the stream that queue.dequeue returns will never complete - there's no obvious way to handle success/error callback in my situation. I've tried to use queue.dequeue.interruptWhen(...here goes the signal...), but if success/error callback arrives before the client has read the data from the stream, stream gets terminated prematurely - there are still unread elements. I'd like the consumer to finish reading them before completing the stream.

Is it possible to do that with FS2? With Akka Streams it's trivial - SourceQueueWithComplete has complete and fail methods.

UPDATE: I was able to get good enough result by wrapping elements in Option and considering None as a signal to stop reading the stream, and additionally by using a Promise to propagate errors:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

However, did I overlook more natural way of doing such things?


Solution

  • One idiomatic way to do this is to create a Queue[Option[A]] instead of Queue[A]. When enqueueing, wrap in Some, and you can explicitly enqueue None to signal completion. On the dequeueing side, do q.dequeue.unNoneTerminate, which gives you a Stream[F, A] that terminates once the Queue emits None