Search code examples
flutterbloccubit

How do I cancel a StreamSubscription inside a Cubit?


I have a cubit that listens to a stream of messages, and emits a state which holds the messages. In the screen, I am using a BlocProvider to access the cubit, and a BlocBuilder to display the messages.

In instances like below, do I need to close the StreamSubscription created on listen()? Is there a clean way to do it?

class MessageCubit extends Cubit<MessageState> {
  final GetMessagesUseCase getMessagesUseCase;
  MessageCubit({this.getMessagesUseCase}) : super(MessageInitial());
  Future<void> getMessages({String senderId, String recipientId}) async {
    emit(MessageLoading());
    try {
      final messagesStreamData = getMessagesUseCase.call();

      //This is where I listen to a stream
      messagesStreamData.listen((messages) {
        emit(MessageLoaded(messages: messages));
      });


    } on SocketException catch (_) {
      emit(MessageFailure());
    } catch (_) {
      emit(MessageFailure());
    }
  }
}

Solution

  • You don't need to close the subscription, but you should as good practice to avoid potential memory leaks. Since it is so straightforward it's not any sacrifice.

    1. Create a class variable of type StreamSubscription<your type>. Let's say it's named sub.
    2. In getMessages before listen: await sub?.cancel()
    3. Then sub = messagesStreamData.listen(...
    4. Override the Cubit's close method and run the same command as in bullet 2.

    Full code:

    class MessageCubit extends Cubit<MessageState> {
      final GetMessagesUseCase getMessagesUseCase;
    
      // Added
      StreamSubscription<YOUR_MESSAGES_TYPE> sub;
    
      MessageCubit({this.getMessagesUseCase}) : super(MessageInitial());
      Future<void> getMessages({String senderId, String recipientId}) async {
        emit(MessageLoading());
        try {
          final messagesStreamData = getMessagesUseCase.call();
    
          // Added
          await sub?.cancel();
          //This is where I listen to a stream
          sub = messagesStreamData.listen((messages) {
            emit(MessageLoaded(messages: messages));
          });
    
    
        } on SocketException catch (_) {
          emit(MessageFailure());
        } catch (_) {
          emit(MessageFailure());
        }
      }
    
      // Added    
      @override
      Future<void> close() async {
        await sub?.cancel();
        return super.close();
      }
    }