Search code examples
mqttspring-xdpaho

how to register springxd source mqtt to mqtt broker


I had created streams with spring-Xd such as-

stream create mqtttestfile --definition "mqtt --url='tcp://localhost:1883' --topics='helloTopic' | file" --deploy

Created and deployed new stream 'mqtttestfile'. I also checked on localhost:9393/admin-ui, stream is successfully created and deployed.

my MQTT broker is running on localhost:1883. but when I checked /tmp/xd/output file directory, mqtttestfile.out file is missing.

I need clarifications on following points of my assumption:-

  1. I am thinking MQTT client is already configured in spring-xd source mqtt module. so when we are creating streams it will automatically subscribe for a particular topic on broker.

  2. I also tried with running two separate python script one for subscription and other is for publisher on separate terminal, and it is working fine. so no problem with mqtt broker.

this is the log from spring-xd console i got:

istener - Scheduling deployments to new container(s) in 15000 ms

2017-03-04T12:07:51+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - Path cache event: path=/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.sink.file.1, type=CHILD_ADDED

2017-03-04T12:07:51+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying module 'file' for stream 'mqtttestfile' 2017-03-04T12:07:52+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying module [ModuleDescriptor@7e658391 moduleName = 'file', moduleLabel = 'file', group = 'mqtttestfile', sourceChannelName = [null], sinkChannelName = [null], index = 1, type = sink, parameters = map[[empty]], children = list[[empty]]]

2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - Path cache event: path=/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.source.mqtt.1, type=CHILD_ADDED

2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying module 'mqtt' for stream 'mqtttestfile' 2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying module [ModuleDescriptor@5a7d8e37 moduleName = 'mqtt', moduleLabel = 'mqtt', group = 'mqtttestfile', sourceChannelName = [null], sinkChannelName = [null], index = 0, type = source, parameters = map['topics' -> 'helloTopic', 'url' -> 'tcp://localhost:1883'], children = list[[empty]]]

2017-03-04T12:07:56+0530 1.3.0.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - Deployment status for stream 'mqtttestfile': DeploymentStatus{state=deployed}

with spring-xd 1.3.1 still the problem remain unsolved, this is the error message I have seen in the log-

2017-03-05T01:15:06+0530 1.3.1.RELEASE INFO LeaderSelector-1 zk.DeploymentSupervisor - Leadership canceled due to thread interrupt

2017-03-05T01:15:06+0530 1.3.1.RELEASE ERROR MQTT Rec: xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter - Lost connection:Connection lost; retrying...

thanks.


Solution

  • I just tested your stream and it worked fine for me...

    $ cat /tmp/xd/output/mqtttestfile.out 
    foo
    bar
    

    (after I added messages foo and bar to the queue).

    With DEBUG logging enabled for org.springframework.integration (in the container's logback.grooy file), I see...

    2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 endpoint.EventDrivenConsumer - started outbound.mqtttestfile.0
    2017-03-04T08:02:59-0500 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - Connected and subscribed to [helloTopic]
    2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - started mqttInbound
    2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - Deployment status for stream 'mqtttestfile': DeploymentStatus{state=deployed}
    

    It is odd that you are not seeing the ...started... messages (INFO).

    Can you try with version 1.3.1 ?