Search code examples
flutterstreamduplicatesstream-buildersink

Issue with StreamBuilder and streams in Flutter (receiving duplicated data)


today I have faced a problem with streams and StreamBuilder. The problem is the following: If you have multiple StreamBuilder widgets listening to the same stream, and you add data into its sink, this data will go out through the stream the amount of StreamBuilder listeners that you have, in other words: If you have one StreamController (or BehaviorSubject) , k amount of widgets of type StreamBuilder, and you try to do StreamController.sink.add(event), this event will go out k times trough the stream, one per StreamBuilder. Is that an expected behavior (expected behavoir = input an event and listening just once from the other side independentlly of the amount of listeners) ? I was able to "fix" this encapsulating almost all the widget tree into one StreamBuilder, but this isnt as optimal as the first approach because of you are rendering the whole tree instead of some little node widgets. Here I left some code to test it if you want (This code is a modification of flutter create project_name project). Thank you! (P.D: This works well if you just listen the streams without StreamBuilder, i.e: streamController.stream.listen..)

import 'dart:async';

import 'package:flutter/cupertino.dart';
import 'package:rxdart/subjects.dart';

class MyAppBloc with ChangeNotifier {
  int _currentIndex;
  BehaviorSubject<bool> _controller;

  MyAppBloc() {
    _currentIndex = 0;
    _controller = BehaviorSubject<bool>();
  }

  Stream<int> get currentIndex => _controller.stream.map<int>((event) {
        print('[event: $event]');
        _currentIndex++;
        return _currentIndex;
      });

  StreamSink<bool> get increment => _controller.sink;

  void close() {
    _controller.close();
  }
}
import 'package:flutter/material.dart';
import 'package:provider/provider.dart';
import 'package:test_project/bloc/my_app_bloc.dart';

class MyHomePage extends StatelessWidget {
  MyHomePage({Key key, this.title}) : super(key: key);

  final String title;
  Widget leadingBuilder(MyAppBloc bloc) {
    return StreamBuilder<int>(
      initialData: 0,
      stream: bloc.currentIndex,
      builder: (BuildContext context, AsyncSnapshot<int> snapshot) {
        print('[leadingBuilderSnapshot: $snapshot]');
        return Text(snapshot.data.toString());
      },
    );
  }

  StreamBuilder<int> counterBuilder(MyAppBloc bloc) {
    return StreamBuilder<int>(
      initialData: 0,
      stream: bloc.currentIndex,
      builder: (BuildContext context, AsyncSnapshot<int> snapshot) {
        print('[counterBuilderSnapshot: $snapshot]');
        return Text(
          snapshot.data.toString(),
          style: Theme.of(context).textTheme.headline4,
        );
      },
    );
  }

  @override
  Widget build(BuildContext context) {
    print('[build]');
    final _bloc = Provider.of<MyAppBloc>(context);
    return Scaffold(
      appBar: AppBar(
        leading: Container(
          width: 30,
          height: 30,
          alignment: Alignment.center,
          child: leadingBuilder(_bloc),
        ),
        title: Text(title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            StreamBuilder<int>(
              initialData: 0,
              stream: _bloc.currentIndex,
              builder: (BuildContext context, AsyncSnapshot<int> snapshot) {
                return Text('${snapshot.data}');
              },
            ),
            Text(
              'You have pushed the button this many times:',
            ),
            counterBuilder(_bloc),
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: () => _bloc.increment.add(true),
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ),
    );
  }
}

Solution

  • Because currentIndex is a getter and you are using map(), a new stream will be created every time you call bloc.currentIndex and StreamBuilder will listen to it.

    So in original code , there are actually 1 StreamControlller, and k Streams. (k: number of StreamBuilder)

    To solve your problem, you can create an eventController, and listen to it inside a bloc to execute your logic. (eventStream is listened from only bloc itself, it will be created just once)

    for example:

    class MyAppBloc {
      MyAppBloc() {
        _eventController.listen((event) {
          print('[event: $event]');
          _indexController.add(currentIndex.value + 1);
        });
      }
    
      final _indexController = BehaviorSubject<int>.seeded(0);
      final _eventController = PublishSubject<bool>();
    
      ValueStream<int> get currentIndex => _indexController.stream;
      StreamSink<bool> get increment => _eventController.sink;
    
      void close() {
        _indexController?.close();
        _eventController?.close();
      }
    }