Search code examples
flutterflutter-dependenciesrxdart

Merging streams in a Bloc


I've got a few streams in my bloc. I am not sure if this is a proper and correct approach because I am new to Flutter and Bloc pattern. But how can I merge the streams in the Bloc into one?

Any hints on the topic are welcome...

My Bloc file

import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';

class WelcomeBloc {
  final _controller = PublishSubject<PageController>();
  final _page = PublishSubject<int>();
  final _lastPage = PublishSubject<bool>();

  Stream<dynamic> get combinedStream =>
      CombineLatestStream.list([getController, currentPage, isLastPage]);

  Stream<PageController> get getController => _controller.stream;
  Stream<int> get currentPage => _page.stream;
  Stream<bool> get isLastPage => _lastPage.stream;

  updatePage(int page) {
    _page.sink.add(page);
  }

  updatePageState(bool state) {
    _lastPage.sink.add(state);
  }

  dispose() {
    _controller.close();
    _page.close();
    _lastPage.close();
  }
}

final welcomeBloc = WelcomeBloc();

Consumer

import 'package:flutter/material.dart';
import '../../blocs/welcome_bloc.dart';
import './pages/page.dart';
import './pages/page2.dart';
import './pages/login.dart';

class ViewerWrapper extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return StreamBuilder(
        stream: welcomeBloc.combinedStream,
        builder: (context, AsyncSnapshot snapshot) {
          return PageView(
            children: <Widget>[
              Page1(),
              Page2(),
              Login(),
            ],
            onPageChanged: (page) {
              welcomeBloc.updatePage(page);
              print(snapshot.data._page);
              // welcomeBloc.updatePage(page + 1);
            },
            // controller: snapshot.data.getController,
          );
        });
  }
}


Solution

  • You can use Rx.combineLatest3 Like this link to merge streams but as I understand from your code, it’s better to define a Model like this:

    Class MyModel{
    
        PageController pageController;
        int currentPage;
        bool isLastPage;
    ...
    
    }
    
    

    And then have a single stream that works with it.

    Edit: applied model to code

    I changed the code with two approaches.

    First One

    You can define a model and work with that model, so you don't need 3 different streams, this is OK if you always need the combined stream version and not each of the streams in any other places.

    I wrote code in one file.

    import 'package:flutter/material.dart';
    import 'package:rxdart/rxdart.dart';
    
    class MyModel {
      PageController pageController;
      int currentPage;
      bool isLastPage;
    
      MyModel({this.currentPage, this.pageController, this.isLastPage});
    }
    
    class WelcomeBloc {
      var _myModel = PublishSubject<MyModel>();
    
      Stream<MyModel> get getModel => _myModel.stream;
    
      // updatePage(int page) {
      //   _page.sink.add(page);
      // }
    
      // updatePageState(bool state) {
      //   _lastPage.sink.add(state);
      // }
      void updateModel(MyModel model) {
        _myModel.sink.add(model);
      }
    
      dispose() {
        _myModel.close();
      }
    }
    
    final welcomeBloc = WelcomeBloc();
    
    class ViewerWrapper extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return StreamBuilder<MyModel>(
            stream: welcomeBloc.getModel,
            builder: (context, AsyncSnapshot<MyModel> snapshot) {
              if (snapshot.hasData) {
                MyModel model = snapshot.data;
                return PageView(
                  children: <Widget>[
                    Page1(),
                    Page2(),
                    Login(),
                  ],
                  onPageChanged: (page) {
                    model.currentPage = page;
                    welcomeBloc.updateModel(model);
                    print(model.currentPage);
                    // welcomeBloc.updatePage(page + 1);
                  },
                  controller: model.pageController,
                );
              }
              return Center(child: CircularProgressIndicator());
            });
      }
    }
    

    Second one

    If you also need other streams in other places and so on, you can combine streams, you can combine them in any way you want, list, map, or a defined model, I used a model(MyModel).

    import 'package:flutter/material.dart';
    import 'package:rxdart/rxdart.dart';
    
    class MyModel {
      PageController pageController;
      int currentPage;
      bool isLastPage;
    
      MyModel({this.currentPage, this.pageController, this.isLastPage});
    }
    
    class WelcomeBloc {
      final _controller = PublishSubject<PageController>();
      final _page = PublishSubject<int>();
      final _lastPage = PublishSubject<bool>();
    
      Stream<MyModel> get combinedStream =>
          Rx.combineLatest3(_page, _lastPage, _controller,
              (int page, bool isLast, PageController controller) {
            return MyModel(
              currentPage: page,
              isLastPage: isLast,
              pageController: controller,
            );
          });
    
      Stream<PageController> get getController => _controller.stream;
      Stream<int> get currentPage => _page.stream;
      Stream<bool> get isLastPage => _lastPage.stream;
    
      updatePage(int page) {
        _page.sink.add(page);
      }
    
      updatePageState(bool state) {
        _lastPage.sink.add(state);
      }
    
      dispose() {
        _controller.close();
        _page.close();
        _lastPage.close();
      }
    }
    
    final welcomeBloc = WelcomeBloc();
    
    class ViewerWrapper extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return StreamBuilder<MyModel>(
            stream: welcomeBloc.combinedStream,
            builder: (context, AsyncSnapshot<MyModel> snapshot) {
              if (snapshot.hasData) {
                MyModel model = snapshot.data;
    
                return PageView(
                  children: <Widget>[
                    Page1(),
                    Page2(),
                    Login(),
                  ],
                  onPageChanged: (page) {
                    welcomeBloc.updatePage(page);
                    print(model.currentPage);
                    // welcomeBloc.updatePage(page + 1);
                  },
                  controller: model.pageController,
                );
              }
              return Center(child: CircularProgressIndicator());
            });
      }
    }