I am using RxJava2.
i have some observable, and few subscribers that can be subscribed for it.
each time when new subscribers arrive, some job should be done and each of subscribers should be notified.
for this i decide to use PublishSubject. but when doOnSubscribe
received from firs subscriber, myPublishSubject.hasObservers() return false...
any idea why it happens and how can i fix this?
private val myPublishSubject = PublishSubject.create<Boolean>()
fun getPublishObservable():Observable<Boolean> {
return myPublishSubject.doOnSubscribe {
//do some job when new subscriber arrived and notify all subscribers
//via
myPublishSubject.onNext(true)
}
}
Do I understand it correct, that when doOnSubscribe called it mean that there is at least one subscribers already present?
i did not find ready answer, so i create my own version of subject and call it RefreshSubject.
it based on PublishSubject but with one difference: if you would like to return observable and be notified when new subscriber arrives and ready to receive some data you should use method getSubscriberReady
.
here a small example:
private RefreshSubject<Boolean> refreshSubject = RefreshSubject.create();
//ordinary publish behavior
public Observable<Boolean> getObservable(){
return refreshSubject;
}
//refreshSubject behaviour
public Observable<Boolean> getRefreshObserver(){
return refreshSubject.getSubscriberReady(new Action() {
@Override
public void run() throws Exception {
//new subscriber arrives and ready to receive some data
//so you can make some data request and all your subscribers (with new one just arrived)
//will receive new content
}
});
}
and here is full class:
public class RefreshSubject<T> extends Subject<T> {
/** The terminated indicator for the subscribers array. */
@SuppressWarnings("rawtypes")
private static final RefreshSubject.RefreshDisposable[] TERMINATED = new RefreshSubject.RefreshDisposable[0];
/** An empty subscribers array to avoid allocating it all the time. */
@SuppressWarnings("rawtypes")
private static final RefreshSubject.RefreshDisposable[] EMPTY = new RefreshSubject.RefreshDisposable[0];
/** The array of currently subscribed subscribers. */
private final AtomicReference<RefreshDisposable<T>[]> subscribers;
/** The error, write before terminating and read after checking subscribers. */
Throwable error;
/**
* Constructs a RefreshSubject.
* @param <T> the value type
* @return the new RefreshSubject
*/
@CheckReturnValue
public static <T> RefreshSubject<T> create() {
return new RefreshSubject<T>();
}
/**
* Constructs a RefreshSubject.
* @since 2.0
*/
@SuppressWarnings("unchecked")
private RefreshSubject() {
subscribers = new AtomicReference<RefreshSubject.RefreshDisposable<T>[]>(EMPTY);
}
@Override
public void subscribeActual(Observer<? super T> t) {
RefreshSubject.RefreshDisposable<T> ps = new RefreshSubject.RefreshDisposable<T>(t, RefreshSubject.this);
t.onSubscribe(ps);
if (add(ps)) {
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isDisposed()) {
remove(ps);
}
} else {
Throwable ex = error;
if (ex != null) {
t.onError(ex);
} else {
t.onComplete();
}
}
}
public Observable<T> getSubscriberReady(final Action onReady){
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> e) throws Exception {
add(new RefreshDisposable(e, RefreshSubject.this));
onReady.run();
}
});
}
/**
* Tries to add the given subscriber to the subscribers array atomically
* or returns false if the subject has terminated.
* @param ps the subscriber to add
* @return true if successful, false if the subject has terminated
*/
private boolean add(RefreshSubject.RefreshDisposable<T> ps) {
for (;;) {
RefreshSubject.RefreshDisposable<T>[] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
RefreshSubject.RefreshDisposable<T>[] b = new RefreshSubject.RefreshDisposable[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
/**
* Atomically removes the given subscriber if it is subscribed to the subject.
* @param ps the subject to remove
*/
@SuppressWarnings("unchecked")
private void remove(RefreshSubject.RefreshDisposable<T> ps) {
for (;;) {
RefreshSubject.RefreshDisposable<T>[] a = subscribers.get();
if (a == TERMINATED || a == EMPTY) {
return;
}
int n = a.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == ps) {
j = i;
break;
}
}
if (j < 0) {
return;
}
RefreshSubject.RefreshDisposable<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new RefreshSubject.RefreshDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (subscribers.compareAndSet(a, b)) {
return;
}
}
}
@Override
public void onSubscribe(Disposable s) {
if (subscribers.get() == TERMINATED) {
s.dispose();
}
}
@Override
public void onNext(T t) {
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (RefreshSubject.RefreshDisposable<T> s : subscribers.get()) {
s.onNext(t);
}
}
@SuppressWarnings("unchecked")
@Override
public void onError(Throwable t) {
if (subscribers.get() == TERMINATED) {
RxJavaPlugins.onError(t);
return;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
error = t;
for (RefreshSubject.RefreshDisposable<T> s : subscribers.getAndSet(TERMINATED)) {
s.onError(t);
}
}
@SuppressWarnings("unchecked")
@Override
public void onComplete() {
if (subscribers.get() == TERMINATED) {
return;
}
for (RefreshSubject.RefreshDisposable<T> s : subscribers.getAndSet(TERMINATED)) {
s.onComplete();
}
}
@Override
public boolean hasObservers() {
return subscribers.get().length != 0;
}
@Override
public Throwable getThrowable() {
if (subscribers.get() == TERMINATED) {
return error;
}
return null;
}
@Override
public boolean hasThrowable() {
return subscribers.get() == TERMINATED && error != null;
}
@Override
public boolean hasComplete() {
return subscribers.get() == TERMINATED && error == null;
}
/**
* Wraps the actualEmitter subscriber, tracks its requests and makes cancellation
* to remove itself from the current subscribers array.
*
* @param <T> the value type
*/
private static final class RefreshDisposable<T> extends AtomicBoolean implements Disposable {
private static final long serialVersionUID = 3562861878281475070L;
/** The actualEmitter subscriber. */
final Emitter<? super T> actualEmitter;
/** The actualEmitter subscriber. */
final Observer<? super T> actualObserver;
/** The subject state. */
final RefreshSubject<T> parent;
/**
* Constructs a PublishSubscriber, wraps the actualEmitter subscriber and the state.
* @param actualEmitter the actualEmitter subscriber
* @param parent the parent RefreshProcessor
*/
RefreshDisposable(Emitter<? super T> actualEmitter, RefreshSubject<T> parent) {
this.actualEmitter = actualEmitter;
this.parent = parent;
actualObserver = null;
}
/**
* Constructs a PublishSubscriber, wraps the actualEmitter subscriber and the state.
* @param actualObserver the actualObserver subscriber
* @param parent the parent RefreshProcessor
*/
RefreshDisposable(Observer<? super T> actualObserver, RefreshSubject<T> parent) {
this.actualObserver = actualObserver;
this.parent = parent;
actualEmitter = null;
}
public void onNext(T t) {
if (!get()) {
if (actualEmitter != null)
actualEmitter.onNext(t);
if (actualObserver != null)
actualObserver.onNext(t);
}
}
public void onError(Throwable t) {
if (get()) {
RxJavaPlugins.onError(t);
} else {
if (actualEmitter != null)
actualEmitter.onError(t);
if (actualObserver != null)
actualObserver.onError(t);
}
}
public void onComplete() {
if (!get()) {
if (actualEmitter != null)
actualEmitter.onComplete();
if (actualObserver != null)
actualObserver.onComplete();
}
}
@Override
public void dispose() {
if (compareAndSet(false, true)) {
parent.remove(this);
}
}
@Override
public boolean isDisposed() {
return get();
}
}
}