Search code examples

What's a simple way to initiate messages in spring-cloud-stream

I just joined a project that is using Spring Cloud Stream as a wrapper around Kafka. The idea is that we'd abstract the messaging API so we're free to switch messaging platforms. I find the API baffling, especially with the latest round of deprecations that redirect me from the annotation-based to a functional model based on Spring Cloud Function. I feel like I'm missing something, because the prescribed programming model seems to make the simple act of producing a message quite a pain. Instead of something like kafkaTemplate.sendDefault("Hello"), we have prescribed monstrosities like:

    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            public String get() {
                try {
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore

How am I supposed to do simple message-driven code with this kind of API?


  • I would say it is not correct to compare an infinite source of data to produce with a single kafkaTemplate.send(). If that is only a functionality you need in your logic, consider to use a StreamBridge:

    The Supplier<Flux> bean is a special signal for the framework to start a source data producing logic. If you would do it yourself, it would not be so straightforward. I'm not talking yet that your Flux.fromStream(Stream.generate()) could be replaced with a single Flux.generate(). It is also not clear why would I use special scheduler and share...

    There is also a @PollableBean way if you don't want to do a complex Flux logic: