Update
In the source code of MQTTAdapterListener of carbon-analytics-common project as this link
The run()
will sleep the thread and I believe that is the reason why the MQTT connection will take so long.
@Override
public void run() {
while (!connectionSucceeded) {
try {
MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration
* MQTTEventAdapterConstants.reconnectionProgressionFactor;
Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration);
startListener();
connectionSucceeded = true;
log.info("MQTT Connection successful");
} catch (InterruptedException e) {
log.error("Interruption occurred while waiting for reconnection", e);
} catch (MqttException e) {
log.error("MQTT Exception occurred when starting listener", e);
}
}
}
The initialReconnectDuration and reconnectionProgressionFactor are as below in MQTTEventAdapterConstants
public static int initialReconnectDuration = 10000;
public static final int reconnectionProgressionFactor = 2;
If I have 12 receiver with MQTT, the 12th one will sleeping 40960 seconds. It seems that there is no way to modify this two constants? Are there have any way can solve this issue? Why the MQTT connection will set to sleep thread in this way?
We have using WSO2 CEP version 4.0.0 and ActiveMQ version 5.13.0. The deployed event receiver is as below
<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER"
statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="mqtt">
<property name="clientId">5718a6b1851cb3474c6f03c2</property>
<property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property>
<property name="cleanSession">false</property>
<property name="url">tcp://127.0.0.1:1883</property>
</from>
<mapping customMapping="enable" type="json">
<property>
<from jsonPath="$.event.payloadData.dyna.Speed3"/>
<to name="speed" type="double"/>
</property>
</mapping>
<to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/>
</eventReceiver>
The clientId is a random object Id of this event execution plan in our database internal. The topic is existed in the ActiveMQ and I am sure that the message has been enqueued into it. The clean session property has set to false for durable subscription.
But, the receiver always take lots of time to connect to the ActiveMQ TOPIC, whatever after we restarted WSO2 CEP or deploying a new receiver.
[2016-04-24 01:07:59,018] INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} - Starting polling event receivers
[2016-04-24 01:07:59,019] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,020] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,021] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,025] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER
[2016-04-24 01:08:19,044] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:08:39,041] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:09:19,034] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:10:39,037] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:13:19,046] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:18:39,035] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:29:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:50:39,036] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 02:33:19,039] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 03:58:39,043] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 06:49:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 12:30:39,040] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
There are 12 receivers, WSO2 start to polling at 01:07:59. After about 10 mins, only 6 of 12 receivers has connected successful, the others will take about 10 or more hours.
Does anybody know why the MQTT of WSO2 CEP receiver will connecting so slow?
There's no way provided out-of-the box to configure the reconnection duration and progressive factor values. But since you've already figured out the code, you can patch that component (either read those values from a config file or configure it through adaptor ui or just hard-code values) and apply the patch as described here to overcome this limitation.