I need a small help with Rxjava . currently I have two hash maps . Each hash map contains vertex message consumers against a subscription key. I want to return a completable object only if I am able to unregister both vertex message consumers. How can I achieve it .
I can post the code i am working on.
@Override public Completable deregisterKeyEvents(String subscriptionId) {
MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
if( subscriptionConsumer != null) {
subscriptionConsumerMap.remove(subscriptionId);
subscriptionConsumer.unregister( res-> {
if(res.succeeded()) {
LOGGER.debug("Subscription channel consumer deregistered successfully!");
} else {
LOGGER.error("Unable to de-register Subscription channel consumer");
}
});
}
if (messageConsumer != null) {
consumerMap.remove(subscriptionId);
return Completable.create(emitter -> {
messageConsumer.unregister(res -> {
if (res.succeeded()) {
emitter.onComplete();
} else {
emitter.onError(res.cause());
}
});
});
} else {
LOGGER.warn("There was no consumer registered!");
return Completable.create(emitter -> emitter.onError(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found")));
}
}
I want to rewrite the above code in such a way
subscriptionConsumer.unregister() & messageConsumer.unregister() is successful then return a completable
The MessageConsumer class is from vert.x libary io.vertx.core.eventbus.MessageConsumer. appreciate if you can help thank you
If you're willing to add Vert.x RxJava2 to your dependencies, you could do this with toCompletable
:
@Override
public Completable deregisterKeyEvents(String subscriptionId) {
MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
Completable c1;
if( subscriptionConsumer != null) {
subscriptionConsumerMap.remove(subscriptionId);
c1 = CompletableHelper.toCompletable(handler -> subscriptionConsumer.unregister(handler))
.doOnSuccess(() -> LOGGER.debug("Subscription channel consumer deregistered successfully!"))
.doOnError(t-> LOGGER.error("Unable to de-register Subscription channel consumer", t));
} else {
c1 = Completable.complete();
}
Completable c2;
if (messageConsumer != null) {
consumerMap.remove(subscriptionId);
c2 = CompletableHelper.toCompletable(handler -> messageConsumer.unregister(handler));
} else {
LOGGER.warn("There was no consumer registered!");
c2 = Completable.error(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found"));
}
return c1.concatWith(c2);
}
Note that this is a bit different than your original code because:
messageConsumer
unregistration happens only after the unregistration of subscriptionConsumer
,messageConsumer
unregistration happens only if unregistration of subscriptionConsumer
was successful.You can use a different method of Completable
if that's not the behavior you want.