Search code examples
flutterstream-builder

StreamBuilder: filter stream data and display new results (gets stuck on ConnectionState.waiting)


I am loading data from Firebase and displaying it, this works perfectly fine. Now I want to let the user filter the results on the client-side. This is what doesn't work.

I am using a StreamController and use the add method to send new user events. Somehow this works fine when getting data from Firebase, but not when I try to switch to the filtered results manually.

Context

Here is the normal code for the initial loading from Firebase:

class UserViewModel {
final UserRepository _userRepo;
      
final StreamController<List<AppUser>> _allUsersStreamcontroller =
      StreamController<List<AppUser>>.broadcast();
List<AppUser> allUsersList = List.empty(growable: true);

// ... other code ...

  Stream<List<AppUser>> getUserStream() {
    return _allUsersStreamcontroller.stream;
  }

Future<bool> loadAllUsersInRadius() async {
    try {
      await _userRepo.getCurrentUserLocation();
    } catch (e) {
      return Future.error(e);
    }

    // Basically load users & auto-filter some results
    if (_userRepo.currentLocation != null) {
      final subcription = _userRepo
          .getAllUsersInRadius(radius: searchRadius)
          .listen((documentList) {
        final filteredUserList =
            _applyAutomaticFilters(documentList, userRelationIDs);

        allUsersList = filteredUserList
            .map((docSnapshot) => _createUserFromSnapshot(docSnapshot))
            .toList();

          _allUsersStreamcontroller.add(allUsersList); // <- Important part, adding users to stream (this works)

      });
      subscriptions.add(subcription);
    }
    return true;
  }

  // ... Other code ...

And StreamBuilder in the UI:

 StreamBuilder<List<AppUser>>(
  // Added random key because otherwise Widget didn't update reliably (suggestion from another SO question)
  key: Key(Random().nextInt(10).toString()),
  stream: userViewModel.getUserStream(), // <- getting stream data
  builder: (context, snapshot) {
    if (snapshot.hasData) {
      if (_swiperEnded || (snapshot.data?.isEmpty ?? false)) {
        // ... display some hint for user about empty list ...
      } else {
        // ... display main UI content ... <- Works fine normally
      }
    } else if (snapshot.connectionState == ConnectionState.waiting) {
      return const Center(child: CircularProgressIndicator()); // <- This is where it gets stuck after I apply manual filters
    } else if (snapshot.hasError || _errorMessage != null) {
      // Display error
    } else {
      // Unimportant
    }
  })

// ... other Widgets ...
// This is where the manual filters are applied, from a BottomSheet
ElevatedButton(
  child: const Text("Apply"),
  onPressed: () {
    userVM.applyManualFilters();
    setState(() {});                                              
    Navigator.pop(sheetContext, true);
  })

Now here is the part where it doesn't work. I take the user list and filter the contents based on the filter values. Then I try to add this new list to the same StreamController.

  void applyManualFilters() {
    Iterable<AppUser> filteredUserList = List.from(allUsersList);

    if (isTimeZoneFilterActive)
      filteredUserList = _filterByTimeZoneOffset(filteredUserList);
    if (isAvailabilityFilterActive)
      filteredUserList = _filterByAvailability(filteredUserList);
    if (isAgeFilterActive)
      filteredUserList = _filterByAgeRange(filteredUserList);

    _allUsersStreamcontroller.add(filteredUserList.toList()); // <- setting filtered data here, does NOT work
  }

My Problem

For some reason when I add the filteredUserList to the stream, it gets stuck on ConnectionState.waiting and the emulator starts eating resources like crazy (laptop starts getting very loud). It seems to get stuck in an infinite loop?

What I've tried: I tried using two different streams instead and then switching between them in the StreamBuilder based on a bool flag, but that ended the same.


I saw many other questions related to switching streams or similar ideas (1, 2, 3) but all the answers there sit at 0 points or don't help at all.
I've seen one suggestion in this SO question to use BehaviorSubject from RxDart which seems like it'd work, but I'd really prefer not to import a whole state management package just for this one instance.


Solution

  • The issue is the random key, because the broadcasting streams don't buffer, the events that you add are going to be sent to the old StreamBuilder. Adding a delay to the add doesn't really solve the problem, it just works because the rebuild takes less that 100ms.

    I imagine that the reason that you added that random key is because the StreamBuilder wouldn't reflect changes when calling setState, this is because you reused the StreamController stream, StreamBuilder compares the streams in it's didUpdateWidget and if you pass the same reference it wont update.

    final controller = StreamController();
    final stream = controller.stream;
    print(controller.stream == stream); // true
    

    In this example you can see the cause a bit more clearly, comment one or both of the lines marked to fix the StreamBuilder's behavior.

    import 'dart:math';
    import 'dart:async';
    
    import 'package:flutter/material.dart';
    
    void main() => runApp(const App());
    
    class App extends StatefulWidget {
      const App({super.key});
    
      @override
      AppState createState() => AppState();
    }
    
    typedef User = String;
    typedef Users = List<User>;
    
    class AppState extends State<App> {
      final controller = StreamController<Users>.broadcast()..stream.listen(print);
    
      Widget _stream_builder(BuildContext context, AsyncSnapshot<Users> snapshot) {
        return switch (snapshot.connectionState) {
          ConnectionState.none => const Text('None'),
          ConnectionState.waiting => const Text('Waiting'),
          ConnectionState.done => Text('Done -> ${snapshot.data}'),
          ConnectionState.active => Text('Active -> ${snapshot.data}'),
        };
      }
    
      int _number = 0;
      Key get _rand_key => ValueKey(Random().nextDouble());
    
      @override
      Widget build(BuildContext context) {
        return MaterialApp(
          home: Scaffold(
            body: StreamBuilder(
              key: _rand_key, // <- Comment out this or the setState
              stream: controller.stream,
              builder: _stream_builder,
            ),
            floatingActionButton: FloatingActionButton(
              onPressed: () {
                if (!controller.isClosed) {
                  controller.add(['$_number']);
                  if (++_number > 5) {
                    controller.close();
                  }
                }
                setState(() {}); // <- Comment out this or the key
              },
              child: const Icon(Icons.add),
            ),
          ),
        );
      }
    }
    

    I don't think that using broadcast stream is necessary in your case, only one widget needs to receive the stream, the rest can do just fine with a reference or a InheritedModel. This example is a bit more in line with how I would do the filtering.

    import 'dart:async';
    
    import 'package:flutter/material.dart';
    
    void main() => runApp(const App());
    
    class App extends StatefulWidget {
      const App({super.key});
    
      @override
      AppState createState() => AppState();
    }
    
    typedef User = String;
    typedef Users = List<User>;
    
    class AppState extends State<App> {
      UsersFilter filter = UsersFilter(invalid_users: ['0', '2', '3', '5', '9']);
    
      // This is more efficient if you are mainly reading and modifying, keep in
      // mind that you are free to modify the contents of the list, this only
      // restricts the resizing of the list. If you want a bit more flexibility,
      // check the [ChangeNotifier]'s implementation.
      Users users = _empty_list;
      static final _empty_list = Users.empty();
    
      // This is a getter just to reflect that it should be treated as a
      // variable, like if it was created by the [StreamController]
      Stream<Users> get _users_stream async* {
        users = _empty_list; // Same as `users.clear()`
    
        for (var i = 0; i < 10; i++) {
          await Future.delayed(Durations.extralong1);
          yield ['$i', '${i * 2}'];
        }
      }
    
      Widget _stream_builder(BuildContext context, AsyncSnapshot<Users> snapshot) {
        if (snapshot.hasData)
          users = users.followedBy(snapshot.data!).toList(growable: false);
    
        return switch (snapshot.connectionState) {
          ConnectionState.none => const Text('None'),
          ConnectionState.waiting => const Text('Waiting'),
          ConnectionState.done => Text('Done -> ${snapshot.data} \n $users'),
          ConnectionState.active => Text('Active -> ${snapshot.data} \n $users'),
        };
      }
    
      @override
      Widget build(BuildContext context) {
        return MaterialApp(
          home: Scaffold(
            body: StreamBuilder(
              stream: _users_stream.transform(filter),
              builder: _stream_builder,
            ),
            floatingActionButton: FloatingActionButton(
              onPressed: () => setState(() {}), // reset
              child: const Icon(Icons.add),
            ),
          ),
        );
      }
    }
    
    class UsersFilter extends StreamTransformerBase<Users, Users> {
      final Users invalid_users;
      UsersFilter({required this.invalid_users});
    
      @override
      Stream<Users> bind(Stream<Users> stream) async* {
        await for (final users in stream) {
          yield users..removeWhere((user) => invalid_users.contains(user));
        }
      }
    }