Search code examples
flutterdartblocrxdartcombinelatest

Rxdart combinelaststream function does not work


I am going to combine two stream. But it does not work. What is my mistake ?

My build function is ;

  @override
  Widget build(BuildContext context) {
    return StreamBuilder(
      stream: Observable.combineLatest2(
        getAllDBAccountsBloc.getAllDBAccountsStream,
        deleteDBAccountBloc.deleteDBAccountStream,
        (accountList, deleteAccountResultModel) {
          print("my account list : ${accountList == null}");
          return AccountsCombinerResult(
            deleteAccountResultBlocModel: deleteAccountResultModel,
            accountsList: accountList,
          );
        },
      ),
      builder: (context, snapshot) {
        print("hasData : ${snapshot.hasData}");
        if (snapshot.hasData) accountsCombinerResult = snapshot.data;
        if (snapshot.hasError) return Text(snapshot.error.toString());
        return _buildWidget;
      },
    );
  }

Get All DB Accounts Stream Bloc is

class GetAllDBAccountsBloc {
  final _getAllDBAccountsFetcher = PublishSubject<List<AccountDatabaseModel>>();

  Observable<List<AccountDatabaseModel>> get getAllDBAccountsStream => _getAllDBAccountsFetcher.stream;

  getAllDBAccounts() async {
    print("accounts getting");
    _getAllDBAccountsFetcher.sink.add(null);
    await new Future.delayed(const Duration(seconds: 1));
    _getAllDBAccountsFetcher.sink.add(await Repository.getAllDBAccounts());
    print("accounts get");
  }

  dispose() {
    _getAllDBAccountsFetcher.close();
  }
}

final getAllDBAccountsBloc = GetAllDBAccountsBloc();

Delete DB Account Bloc is

class DeleteDBAccountBloc {
  final _deleteDBAccountFetcher = PublishSubject<DeleteAccountResultBlocModel>();

  Observable<DeleteAccountResultBlocModel> get deleteDBAccountStream => _deleteDBAccountFetcher.stream;

  deleteDBAccount(DeleteAccountRequestBlocModel requestModel) async {
    _deleteDBAccountFetcher.sink.add(DeleteAccountResultBlocModel());
    await new Future.delayed(const Duration(seconds: 1));
    _deleteDBAccountFetcher.sink.add(await Repository.deleteDBAccount(requestModel));
  }

  dispose() {
    _deleteDBAccountFetcher.close();
  }
}

final deleteDBAccountBloc = DeleteDBAccountBloc();

Combiner result class is

class AccountsCombinerResult {
  final DeleteAccountResultBlocModel deleteAccountResultBlocModel;
  final List<AccountDatabaseModel> accountsList;

  AccountsCombinerResult({
    @required this.accountsList,
    @required this.deleteAccountResultBlocModel,
  });
}

its mine Run log on android studio..

I/flutter (28323): accounts getting

I/flutter (28323): hasData : false

I/flutter (28323): hasData : false

I/flutter (28323): accounts get

The stream work but i did not get AccountsCombiner Result data.

This build method work but i don't want use it...

  @override
  Widget build(BuildContext context) {
    return StreamBuilder(
      stream: getAllDBAccountsBloc.getAllDBAccountsStream,
      builder: (context, getDbAccountsSnapshot) {
        return StreamBuilder(
          stream: deleteDBAccountBloc.deleteDBAccountStream,
          builder: (context, deleteDbAccountStreamSnapshot) {
            if (deleteDbAccountStreamSnapshot.hasData && getDbAccountsSnapshot.hasData) {
              print("qweqweq");
              accountsCombinerResult = AccountsCombinerResult(
                accountsList: getDbAccountsSnapshot.data,
                deleteAccountResultBlocModel: deleteDbAccountStreamSnapshot.data,
              );
            }
            if (getDbAccountsSnapshot.hasError) return Text(getDbAccountsSnapshot.error.toString());
            if (deleteDbAccountStreamSnapshot.hasError) return Text(deleteDbAccountStreamSnapshot.error.toString());
            return _buildWidget;
          },
        );
      },
    );
  }

Solution

  • You are building a new stream every time the build method is called. You need to keep the stream reference in the state.

    
    StreamController<AccountsCombinerResult> _streamController = StreamController<AccountsCombinerResult>();
    
    @override
    void initState() {
        super.initState();
        _streamController.addStream(Observable.combineLatest2(
            getAllDBAccountsBloc.getAllDBAccountsStream,
            deleteDBAccountBloc.deleteDBAccountStream,
            (accountList, deleteAccountResultModel) {
              print("my account list : ${accountList == null}");
              return AccountsCombinerResult(
                deleteAccountResultBlocModel: deleteAccountResultModel,
                accountsList: accountList,
              );
            },
          ));
    }
    
    @override
    void dispose() {
        super.dispose();
        _streamController.close();
    }
    
    @override
    Widget build(BuildContext context) {
        return StreamBuilder(
          stream: _streamController.stream,
          builder: (context, snapshot) {
            print("hasData : ${snapshot.hasData}");
            if (snapshot.hasData) accountsCombinerResult = snapshot.data;
            if (snapshot.hasError) return Text(snapshot.error.toString());
            return _buildWidget;
          },
        );
    }
    

    To make this easier you could use the StreamProvider from the provider package. https://pub.dev/packages/provider https://pub.dev/documentation/provider/latest/provider/StreamProvider-class.html

    It only build the stream once.

    @override
    Widget build(BuildContext context) {
        return StreamProvider<AccountsCombinerResult>(
          initialData: null, // not sure if this works, you can try []
          create: () => Observable.combineLatest2(
            getAllDBAccountsBloc.getAllDBAccountsStream,
            deleteDBAccountBloc.deleteDBAccountStream,
            (accountList, deleteAccountResultModel) {
              print("my account list : ${accountList == null}");
              return AccountsCombinerResult(
                deleteAccountResultBlocModel: deleteAccountResultModel,
                accountsList: accountList,
              );
            },
          ),
          catchError: (context, error) => AccountsCombinerResult(
              deleteAccountResultBlocModel: null,
              accountsList: null,
              error: error,
          ), 
          child: Builder(
            builder: (context) {
                final data = Provider.of<AccountsCombinerResult>(context);
                // maybe null check
                if (data.error != null) return Text(data.error.toString());
                accountsCombinerResult =data;
                return _buildWidget;
             },
          ),
        );
    }
    
    class AccountsCombinerResult {
      final DeleteAccountResultBlocModel deleteAccountResultBlocModel;
      final List<AccountDatabaseModel> accountsList;
      final dynamic error;
    
      AccountsCombinerResult({
        @required this.accountsList,
        @required this.deleteAccountResultBlocModel,
        this.error,
      });
    }
    

    The code is not tested so there may be typos or stuff that I missed, but you should get the general idea.