Search code examples
androidrx-javamqtthivemq

HiveMQ exception fired after airplane mode, Session expired as connection was closed


I've implemented HiveMQ inside a background thread with success, but there is a little issue after activate airplane mode on. The log shows the reason but I think there is something missing and I can't see where is the missing error handler.

The log:

com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
System.err  W  io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call

The implementation code:

    client = Mqtt5Client.builder()
            .serverHost(host)
            .serverPort(port)
            .identifier(clientId)
            .addDisconnectedListener(new MqttClientDisconnectedListener() {
                @Override
                public void onDisconnected(MqttClientDisconnectedContext context) {
                    Log.d(TAG, "On disconnected... " + context.getCause());
                }
            })
            .automaticReconnectWithDefaultConfig()
            .buildRx();
    Mqtt5Connect connect = Mqtt5Connect.builder()
            .willPublish()
                .topic(willTopic)
            .applyWillPublish()
            .build();

    Completable connectScenario = client.connect(connect)
            .doOnSuccess(this::connectSuccess)
            .doOnError(this::connectFailed)
            .ignoreElement();

    Single<Mqtt5PublishResult> publishConnect
            = client.publish(Flowable.just(
                    Mqtt5Publish.builder()
                            .topic("d/" + this.clientId + START)
                            .payload(startData.toByteArray())
                            .build())).singleOrError();

            connectScenario
            .andThen(publishConnect)
            .doOnSuccess(this::onConnectSuccess)
            .doOnError(this::disconnectError)
            .subscribe();

For sure there is something missing, but the question is where should I handle the disconnect events.


Solution

  • RxJava does not treat errors as handled if you add an doOnError callback.

    You can add your error handler to the subscribe call:

    connectScenario
        .andThen(publishConnect)
        .subscribe(this::onConnectSuccess, this::disconnectError);
    

    Instead you could also ignore the error after you handled it in the doOnError callback:

    connectScenario
        .andThen(publishConnect)
        .doOnSuccess(this::onConnectSuccess)
        .doOnError(this::disconnectError)
        .ignoreElement().onErrorComplete()
        .subscribe();
    

    If you want to tolerate temporary network unavailability, you should use a sessionExpiryInterval > 0 and automatic reconnect.