Search code examples
flutterrxdart

StreamBuilder receives only last item from stream


My ApplicationBloc is the root of the widget tree. In the bloc's constructor I'm listening to a stream from a repository that contains models decoded from JSON and forwarding them to another stream which is listened to by StreamBuilder.

I expected that StreamBuilder would receive models one by one and add them to AnimatedList. But there's the problem: StreamBuilder's builder fires only once with the last item in the stream.

For example, several models lay in the local storage with ids 0, 1, 2 and 3. All of these are emitted from repository, all of these are successfully put in the stream controller, but only the last model (with id == 3) appears in the AnimatedList.

Repository:

class Repository {
  static Stream<Model> load() async* {
    //...
    for (var model in models) {
      yield Model.fromJson(model);
    }
  }
}

Bloc:

class ApplicationBloc {
  ReplaySubject<Model> _outModelsController = ReplaySubject<Model>();
  Stream<Model> get outModels => _outModelsController.stream;

  ApplicationBloc() {
    TimersRepository.load().listen((model) => _outModelsController.add(model));
  }
}

main.dart:

void main() {
  runApp(
    BlocProvider<ApplicationBloc>(
      bloc: ApplicationBloc(),
      child: MyApp(),
    ),
  );
}

//...

class _MyAppState extends State<MyApp> {
  @override
  Widget build(BuildContext context) {
    final ApplicationBloc appBloc = //...

    return MaterialApp(
      //...
      body: StreamBuilder(
        stream: appBloc.outModels,
        builder: (context, snapshot) {
          if (snapshot.hasData) {
            var model = snapshot.data;
            /* inserting model to the AnimatedList */
          }

          return AnimatedList(/* ... */);
        },
      ),
    );
  }
}

Interesting notice: in the StreamBuilder's _subscribe() method onData() callback triggers required number of times but build() method fires only once.


Solution

  • You need a Stream that outputs a List<Model instead of a single element. Also, listening to a stream to add it to another ReplaySubject will delay the output stream by 2 (!!!) frames, so it would be better to have a single chain.

    class TimersRepository {
      // maybe use a Future if you only perform a single http request!
      static Stream<List<Model>> load() async* {
        //...
        yield models.map((json) => Model.fromJson(json)).toList();
      }
    }
    
    class ApplicationBloc {
      Stream<List<Model>> get outModels => _outModels;
      ValueConnectableObservable<List<Model>> _outModels;
      StreamSubscription _outModelsSubscription;
    
      ApplicationBloc() {
        // publishValue is similar to a BehaviorSubject, it always provides the latest value,
        // but without the extra delay of listening and adding to another subject
        _outModels = Observable(TimersRepository.load()).publishValue();
    
        // do no reload until the BLoC is disposed
        _outModelsSubscription = _outModels.connect();
      }
    
      void dispose() {
        // unsubcribe repo stream on dispose
        _outModelsSubscription.cancel();
      }
    }
    
    class _MyAppState extends State<MyApp> {
      ApplicationBloc _bloc;
    
      @override
      Widget build(BuildContext context) {
        return StreamBuilder<List<Model>>(
          stream: _bloc.outModels,
          builder: (context, snapshot) {
            final models = snapshot.data ?? <Model>[];
            return ListView.builder(
              itemCount: models.length,
              itemBuilder: (context, index) => Item(model: models[index]),
            );
          },
        );
      }
    }