I am using Spring integration 5.0.6. I have gone though it's document and created following code which listens on a HTTP endpoint and publish to a kafka topic.
Everything is working fine and I am receiving message at topic also. But at HTTP client there is no reply sent, it's giving "No reply received within timeout".
How can I send a reply back to http caller in below code:
@Bean
public DirectChannel replyChannel() {
return new DirectChannel();
}
@Bean(name = "restInputFlow")
public IntegrationFlow send() {
return IntegrationFlows
.from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class).replyChannel(replyChannel()))
.transform(new Transformer())
.handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
.enrichHeaders(
c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
.get();
}
private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {
return Kafka.outboundChannelAdapter(producerFactory)
.messageKey("key").headerMapper(mapper())
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
Thanks for any help.
Your problem that you use a one-way Kafka.outboundChannelAdapter(producerFactory)
. This is just for "send-and-forget".
If you are interested in producing some subsequent process or just as you need to reply to the HTTP request, you should consider to use a:
/**
* The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
* method specific implementation to allow the use of the 'subflow' subscriber capability.
* @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
* {@link PublishSubscribeSpec} options including 'subflow' definition.
* @return the current {@link IntegrationFlowDefinition}.
*/
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {
On the flow definition, where your first subscriber is really going to be that Kafka.outboundChannelAdapter(producerFactory)
and the second one can be that mentioned .enrichHeaders()
. If you do nothing more, this last one is going to send its result into the replyChannel
header and, therefore, will reach an HTTP response.
In this publish-subscribe scenario you should keep in mind that the payload
for the second subscriber is going to be the same you try to send to Kafka.