Search code examples
dartrxdart

Convert Stream<Stream<T>> to Stream<T>


I am using rxdart package to handle stream in dart. I am stuck in handling a peculiar problem.

Please have a look at this dummy code:

final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));

Now I want to convert the oops variable to get only Observable<T>.

I am finding it difficult to explain clearly. But let me try. I have a stream A. I map each output of stream A to another stream B. Now I have Stream<Stream<B>> - a kind of recurrent stream. I just want to listen to the latest value produced by this pattern. How may I achieve this?


Solution

  • It's somewhat rare to have a Stream<Stream<Something>>, so it isn't something that there is much explicit support for.

    One reason is that there are several (at least two) ways to combine a stream of streams of things into a stream of things.

    1. Either you listen to each stream in turn, waiting for it to complete before starting on the next, and then emit the events in order.

    2. Or you listen on each new stream the moment it becomes available, and then emit the events from any stream as soon as possible.

    The former is easy to write using async/await:

    Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
      await for (var stream in source) yield* stream;
    }
    

    The later is more complicated because it requires listening on more than one stream at a time, and combining their events. (If only StreamController.addStream allowed more than one stream at a time, then it would be much easier). You can use the StreamGroup class from package:async for this:

    import "package:async/async" show StreamGroup;
    
    Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
      var sg = StreamGroup<T>();
      source.forEach(sg.add).whenComplete(sg.close);
      // This doesn't handle errors in [source].
      // Maybe insert 
      //   .catchError((e, s) {
      //      sg.add(Future<T>.error(e, s).asStream())
      // before `.whenComplete` if you worry about errors in [source].          
      return sg.stream;
    }