Search code examples
dartstream

Dart `asyncExpand` not working as expected


I am trying to create the following stream baz which consists of an "outer" stream, where each of this stream's events also has an "inner" stream.

For each event X in the outer stream, all the events in the inner stream for X should be added to baz.

class Foo {
  final StreamController<int> bar = StreamController();

  Foo() {
    getsAnOuterStream.listen((event) {
      bar.addStream(getsAnInnerStream(event));
    });
  }

  Stream<int> get baz => bar.stream;
}

The above code works as intended. However, I had understood that I could simply achieve this by using the asyncExpand method instead.

class Foo {
  final Stream<int> baz = getsAnOuterStream
    .asyncExpand((event) => getsAnInnerStream(event));
}

This does not work – when the outer stream changes, the new inner stream events are not added to baz. Is there a subtlety that I'm missing here? Any help is much appreciated!

Just to note... I think the problem could be to do with this: if an inner stream goes on forever, baz will never move on to the events from the next inner stream. However, if this is the issue, why does the top solution work?


Solution

  • The reason the top code works and the bottom code doesn't is a difference in the implementations of StreamController.addStream() (documentation) and Stream.asyncExpand().

    addStream listens to the events of the getsAnInnerStream(event) stream and just forwards them to bar, whereas asyncExpand waits for the getsAnInnerStream(event) stream to end before moving onto the next event.

    This means that your code will not work as you expect if the inner streams are open for a long time (or indefinitely).