I've implemented HiveMQ inside a background thread with success, but there is a little issue after activate airplane mode on. The log shows the reason but I think there is something missing and I can't see where is the missing error handler.
The log:
com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
System.err W io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call
The implementation code:
client = Mqtt5Client.builder()
.serverHost(host)
.serverPort(port)
.identifier(clientId)
.addDisconnectedListener(new MqttClientDisconnectedListener() {
@Override
public void onDisconnected(MqttClientDisconnectedContext context) {
Log.d(TAG, "On disconnected... " + context.getCause());
}
})
.automaticReconnectWithDefaultConfig()
.buildRx();
Mqtt5Connect connect = Mqtt5Connect.builder()
.willPublish()
.topic(willTopic)
.applyWillPublish()
.build();
Completable connectScenario = client.connect(connect)
.doOnSuccess(this::connectSuccess)
.doOnError(this::connectFailed)
.ignoreElement();
Single<Mqtt5PublishResult> publishConnect
= client.publish(Flowable.just(
Mqtt5Publish.builder()
.topic("d/" + this.clientId + START)
.payload(startData.toByteArray())
.build())).singleOrError();
connectScenario
.andThen(publishConnect)
.doOnSuccess(this::onConnectSuccess)
.doOnError(this::disconnectError)
.subscribe();
For sure there is something missing, but the question is where should I handle the disconnect events.
RxJava does not treat errors as handled if you add an doOnError
callback.
You can add your error handler to the subscribe
call:
connectScenario
.andThen(publishConnect)
.subscribe(this::onConnectSuccess, this::disconnectError);
Instead you could also ignore the error after you handled it in the doOnError
callback:
connectScenario
.andThen(publishConnect)
.doOnSuccess(this::onConnectSuccess)
.doOnError(this::disconnectError)
.ignoreElement().onErrorComplete()
.subscribe();
If you want to tolerate temporary network unavailability, you should use a sessionExpiryInterval > 0 and automatic reconnect.