Search code examples
dartasync-awaitstreamrxdart

Combining Streams of Futures in Dart


Combining multiple streams with stream transformers and an async function that returns a future, I end up with a stream of future.

I can use something like to flatten the stream of Future:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) yield* Stream.fromFuture(future);
}

It seams to me a bit of an overkill to convert the Future to a stream but I did not come across any elegant solution. I could of course pass it to another observable with a listener but I suspect I just overlook an easier approach?


Solution

  • First of all, a stream of futures is dangerous. I highly recommend avoiding it if absolutely possible.

    For your current example, if your futures do not have any errors, you can await the future directly instead of converting it to a stream:

    Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
      await for (var future in source) yield await future;
    }
    

    That is pretty much the most idiomatic way of waiting for the futures of the stream, then waiting for each future, and finally creating a stream of the future results. It will break at the first error, though. If there are never any errors in your futures, this is fine and you can stop reading here.

    If you actually want to forward all errors from futures to the resulting stream, then your original code is perfect! The only way for an async* function to emit an error on the stream without also closing the stream, is to yield* a stream containing the error.

    An alternative version, optimized for readability rather than brevity, could be:

    Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
      await for (var future in source) {
        try {
          yield await future;
        } catch (error, stack) {
          yield* Stream.error(error, stack);
        }
      }
    }
    

    but the result is the same as using Stream.fromFuture.

    The danger I was talking about here is that while you are awaiting that future (or yield*'ing that stream), you pause the stream that you are listening on. That, by itself, is usually not a problem because streams are built to be paused. If there are more data incoming, it will just be buffered. However, when the data is a future, and that future might complete with an error, delaying the delivery means that you might not have time to add an error handler to the future before it completes with the error. That will make the error an unhandled error which may crash your application. So, that's bad.

    So, if you might have error futures in your stream, you need to get to those as soon as possible. Even waiting for a stream to deliver the future might be too late, which is why you should generally not have a Stream<Future> at all. You need to be exceedingly clever to avoid problems (you must deliver the futures synchronously, and you must react to pauses and cancels immediately, and you mustn't introduce other delays between creating the future and delivering it.)

    I recommend that you rewrite the code which creates the Stream<Future<T>> to just create a Stream<T>. I can't say how to do that without seeing the original code, but probably you'll await the future immediately where it's created using the function that returns futures, and then send the value or error to the stream as the stream event, rather than sending the future itself.