I have simple @Bean
(Java 8 Functions) which are mapped to a destination topic
(-out
and -in
).
@Bean
public Function<String, String> transform() {
return payload -> payload.toUpperCase();
}
@Bean
public Consumer<String> receive() {
return payload -> logger.info("Data received: " + payload);
}
.yml config:
spring:
cloud:
stream:
function:
definition: transform;receive
bindings:
transform-out-0:
destination: myTopic
receive-in-0:
destination: myTopic
Now, I want to invoke the transform
function via a REST
call so that it's output goes to the destination topic
(i.e. transform-out-0
mapped to myTopic
) and is picked up by the consumer
from this destination (receive-in-0
mapped to myTopic
). Basically, every REST call should spawn a new instance of a KAFKA Producer
and close it.
How can I achieve this please using spring-cloud-stream
?
Thanks
Angshuman
You should use StreamBridge
instead of having that transform
function. This is the new recommended approach for dynamic destinations in Spring Cloud Stream.
Here is the basic idea:
@Autowired
private StreamBridge streamBridge;
@RequestMapping
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("transform-out-0", body);
}
and then provide this property through configuration - spring.cloud.stream.source: transform
Spring Cloud Stream will create an output binding called transform-out-0
for you. Each time the REST endpoint is called, through StreamBridge
, you will send the data to the destination topic.
For more info see this.