Search code examples
javaspring-integrationspring-integration-dslspring-integration-http

Spring Integration Java DSL HTTP no reply received within timeout error


I am using Spring integration 5.0.6. I have gone though it's document and created following code which listens on a HTTP endpoint and publish to a kafka topic.

Everything is working fine and I am receiving message at topic also. But at HTTP client there is no reply sent, it's giving "No reply received within timeout".

How can I send a reply back to http caller in below code:

@Bean
public DirectChannel replyChannel() {
    return new DirectChannel();
}

@Bean(name = "restInputFlow")
public IntegrationFlow send() {
    return IntegrationFlows
            .from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
                    .requestPayloadType(String.class).replyChannel(replyChannel()))
            .transform(new Transformer())
            .handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
            .enrichHeaders(
                    c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
            .get();
}

private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
            ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {

        return Kafka.outboundChannelAdapter(producerFactory)
                .messageKey("key").headerMapper(mapper())
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
    }

Thanks for any help.


Solution

  • Your problem that you use a one-way Kafka.outboundChannelAdapter(producerFactory). This is just for "send-and-forget".

    If you are interested in producing some subsequent process or just as you need to reply to the HTTP request, you should consider to use a:

    /**
     * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
     * method specific implementation to allow the use of the 'subflow' subscriber capability.
     * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
     * {@link PublishSubscribeSpec} options including 'subflow' definition.
     * @return the current {@link IntegrationFlowDefinition}.
     */
    public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {
    

    On the flow definition, where your first subscriber is really going to be that Kafka.outboundChannelAdapter(producerFactory) and the second one can be that mentioned .enrichHeaders(). If you do nothing more, this last one is going to send its result into the replyChannel header and, therefore, will reach an HTTP response.

    In this publish-subscribe scenario you should keep in mind that the payload for the second subscriber is going to be the same you try to send to Kafka.