Search code examples
rx-java2vert.x

How to unregister two vertx consumers and returns an rxjava completable?


I need a small help with Rxjava . currently I have two hash maps . Each hash map contains vertex message consumers against a subscription key. I want to return a completable object only if I am able to unregister both vertex message consumers. How can I achieve it .

I can post the code i am working on.

@Override public Completable deregisterKeyEvents(String subscriptionId) {

MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);


if( subscriptionConsumer != null) {
    subscriptionConsumerMap.remove(subscriptionId);
    subscriptionConsumer.unregister( res-> {
        
        if(res.succeeded()) {
            LOGGER.debug("Subscription channel consumer deregistered successfully!");
        } else {
            LOGGER.error("Unable to de-register Subscription channel consumer");
        }
        
    });
}

if (messageConsumer != null) {
    
    consumerMap.remove(subscriptionId);
    
    return Completable.create(emitter -> {
        
        messageConsumer.unregister(res -> {
            if (res.succeeded()) {
                emitter.onComplete();
            } else {
                emitter.onError(res.cause());
            }
        });
    });
} else {
    LOGGER.warn("There was no consumer registered!");
    return Completable.create(emitter -> emitter.onError(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found")));
}

}

I want to rewrite the above code in such a way

subscriptionConsumer.unregister() & messageConsumer.unregister() is successful then return a completable

The MessageConsumer class is from vert.x libary io.vertx.core.eventbus.MessageConsumer. appreciate if you can help thank you


Solution

  • If you're willing to add Vert.x RxJava2 to your dependencies, you could do this with toCompletable:

    @Override
    public Completable deregisterKeyEvents(String subscriptionId) {
    
        MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
        MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
        
        Completable c1;
        if( subscriptionConsumer != null) {
            subscriptionConsumerMap.remove(subscriptionId);
            c1 = CompletableHelper.toCompletable(handler -> subscriptionConsumer.unregister(handler))
                .doOnSuccess(() -> LOGGER.debug("Subscription channel consumer deregistered successfully!"))
                .doOnError(t-> LOGGER.error("Unable to de-register Subscription channel consumer", t));
        } else {
            c1 = Completable.complete();
        }
        
        Completable c2;
        if (messageConsumer != null) {
            consumerMap.remove(subscriptionId);
            c2 = CompletableHelper.toCompletable(handler -> messageConsumer.unregister(handler));
        } else {
            LOGGER.warn("There was no consumer registered!");
            c2 = Completable.error(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found"));
    
        }
    
        return c1.concatWith(c2);
    }
    

    Note that this is a bit different than your original code because:

    • the messageConsumer unregistration happens only after the unregistration of subscriptionConsumer,
    • the messageConsumer unregistration happens only if unregistration of subscriptionConsumer was successful.

    You can use a different method of Completable if that's not the behavior you want.