Search code examples
streamrx-javareactive-programmingrx-java2

How to reset a BehaviorSubject


I have a BehaviorSubject that I would like to reset - by that I mean I want the latest value to not be available, just as if it was just created.

I don't seem to see an API to do this but I suppose there is another way to achieve the same result?

My desired behavior is that I need to emit events, and I'd like subscribers to get the latest event when they subscribe - if a particular manager is in a 'started' state. But when this manager is 'stopped' the latest event should not be available (just like if it was never started in the first place).


Solution

  • I assume you want to clear the BehaviorSubject (because otherwise don't call onComplete on it). That is not supported but you can achieve a similar effect by having a current value that is ignored by consumers:

    public static final Object EMPTY = new Object();
    
    BehaviorSubject<Object> subject = BehaviorSubject.createDefault(EMPTY);
    
    Observable<YourType> obs = subject.filter(v -> v != EMPTY).cast(YourType.class);
    
    obs.subscribe(System.out::println);
    
    // send normal data
    subject.onNext(1);
    subject.onNext(2);
    
    // clear the subject
    subject.onNext(EMPTY);
    
    // this should not print anything
    obs.subscribe(System.out::println);