I want to implement something like gate mechanism. I need one PublishSubject and a couple of subscribers. When PublishSubject send data via onNext only one subscriber will receive it.
For example: I have 3 equals fragments inside tabs. They have subscription to global published called onLoginPublisher. When onResume or onPause called gate becomes open or closed. When onLogin called and no gates are opened because of no one of these fragments on screen, onNext will wait for fragment's onResume
You can use filter
with the gate's state. For example, you can wrap all the logic into a class:
public final class GatedSubject<T> {
final PublishSubject<T> subject = PublishSubject.create();
final AtomicReferenceArray<Boolean> gates;
public GatedSubject(int numGates) {
gates = new AtomicReferenceArray<>(numGates);
}
public boolean getGateStatus(int gateIndex) {
return gates.get(gateIndex) != null;
}
public void setGateStatus(int gateIndex, boolean status) {
gates.set(gateIndex, status ? Boolean.TRUE : null);
}
public void Observable<T> getGate(int gateIndex) {
return subject.filter(v -> getGateStatus(gateIndex));
}
public void onNext(T item) {
subject.onNext(item);
}
public void onError(Throwable error) {
subject.onError(error);
}
public void onComplete() {
subject.onComplete();
}
}