Search code examples
spring-cloud-dataflow

Connection to rabbitmq failed when deploying spring cloud data flow in the local


I referenced the documentation: https://dataflow.spring.io/docs/installation/local/manual/ to run the spring cloud data flow in my local mac, and then deployed an http | log stream according to https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/.

But during the "verifying output (local)" step (https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/#local), I failed.

I can see there are a lot of connection error in the log:

enter image description here

Summary, the steps I took:

  • In terminal 1, start rabbitmq by docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7
  • Then in terminal 1, start skipper server by java -jar spring-cloud-skipper-server-2.9.2.jarenter image description here
  • Then in terminal 2, start dataflow server by java -jar spring-cloud-dataflow-server-2.10.2.jarenter image description here
  • Then in terminal 3, start dataflow shell by java -jar spring-cloud-dataflow-shell-2.10.2.jarenter image description here
  • Open http://localhost:9393/dashboard/index.html#/streams/list/http-ingest dashboard to create stream and deploy
  • Then in terminal 4, send curl http://localhost:20100 -H "Content-type: text/plain" -d "Happy streaming". The request was successfully sent, but no response body.

Additional information

I can get success response from localhost:20100 like: enter image description here

The docker containers are as follows: enter image description here

Expected (Help wanted!)

I can verify the output as suggested by the documentation.

enter image description here

Actual result:

But I didn't get the log. Instead, I get the following:

2023-08-09 03:25:47.057  INFO [http-source,2aa0ce95298cd6b1,db9808ed2bfa4443] 1 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-08-09 03:26:47.705 ERROR [http-source,2aa0ce95298cd6b1,f116c3c77334209a] 1 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@7a71c2e3]; nested exception is org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out, failedMessage=GenericMessage [payload=byte[15], headers={content-length=15, http_requestMethod=POST, b3=2aa0ce95298cd6b1-db9808ed2bfa4443-0, nativeHeaders={}, host=localhost:20100, http_requestUrl=http://localhost:20100/, id=1bd16f86-c76a-fb4c-4362-6df88325daf7, contentType=text/plain, user-agent=curl/7.82.0, accept=*/*, timestamp=1691551546926}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:249)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1072)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.doRabbitSend(AmqpOutboundEndpoint.java:250)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:231)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:180)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 43 more
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.base/java.net.SocksSocketImpl.connect(Unknown Source)
    at java.base/java.net.Socket.connect(Unknown Source)
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
    ... 53 more

Solution

  • Fixed by connecting to the rabbitmq using host IP as it's run by docker.

    But I followed the documentation to run it using docker, which lead to connection issues. I think the official document should be updated. enter image description here

    To get host IP, for mac osx you can use

    ipconfig getifaddr en0
    192.168.1.10
    

    And then configure it into the apps during deploying stream step.

    app.http.spring.rabbitmq.host=192.168.1.10
    app.log.spring.rabbitmq.host=192.168.1.10