Search code examples

How to unregister two vertx consumers and returns an rxjava completable?

I need a small help with Rxjava . currently I have two hash maps . Each hash map contains vertex message consumers against a subscription key. I want to return a completable object only if I am able to unregister both vertex message consumers. How can I achieve it .

I can post the code i am working on.

@Override public Completable deregisterKeyEvents(String subscriptionId) {

MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);

if( subscriptionConsumer != null) {
    subscriptionConsumer.unregister( res-> {
        if(res.succeeded()) {
            LOGGER.debug("Subscription channel consumer deregistered successfully!");
        } else {
            LOGGER.error("Unable to de-register Subscription channel consumer");

if (messageConsumer != null) {
    return Completable.create(emitter -> {
        messageConsumer.unregister(res -> {
            if (res.succeeded()) {
            } else {
} else {
    LOGGER.warn("There was no consumer registered!");
    return Completable.create(emitter -> emitter.onError(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found")));


I want to rewrite the above code in such a way

subscriptionConsumer.unregister() & messageConsumer.unregister() is successful then return a completable

The MessageConsumer class is from vert.x libary io.vertx.core.eventbus.MessageConsumer. appreciate if you can help thank you


  • If you're willing to add Vert.x RxJava2 to your dependencies, you could do this with toCompletable:

    public Completable deregisterKeyEvents(String subscriptionId) {
        MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
        MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
        Completable c1;
        if( subscriptionConsumer != null) {
            c1 = CompletableHelper.toCompletable(handler -> subscriptionConsumer.unregister(handler))
                .doOnSuccess(() -> LOGGER.debug("Subscription channel consumer deregistered successfully!"))
                .doOnError(t-> LOGGER.error("Unable to de-register Subscription channel consumer", t));
        } else {
            c1 = Completable.complete();
        Completable c2;
        if (messageConsumer != null) {
            c2 = CompletableHelper.toCompletable(handler -> messageConsumer.unregister(handler));
        } else {
            LOGGER.warn("There was no consumer registered!");
            c2 = Completable.error(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found"));
        return c1.concatWith(c2);

    Note that this is a bit different than your original code because:

    • the messageConsumer unregistration happens only after the unregistration of subscriptionConsumer,
    • the messageConsumer unregistration happens only if unregistration of subscriptionConsumer was successful.

    You can use a different method of Completable if that's not the behavior you want.