Search code examples
spring-cloud-streamspring-kafka

Asynchronously write to spring cloud kafka stream


I want to asynchronously write to spring cloud kafka stream. For example

Class SomeClass{

@StreamLister(Processor.INPUT)
public void receiveEvents(String e){

  class ThreadExecutor implements Runnable {
  private String message;

    public ThreadExecutor(message){
        this.message = message;
    }

    public void run(){
        //after processing the string I will publish it
         message = message + "done";
         writeToStream(message);
     }
   }

   Executors.newCachedThreadPool().execute(new ThreadExecutor(e));
}

@SendTo //not sure how to write it back
public Message<String> writeToStream(String message){
   //this is what I want to know
  }

}

So in the above example. I want to know how to call writeToStream method, so that it will write back to kafka. Basically I want to write to stream on completion of task unlike polling. Please help.


Solution

  • @Autowired
    private MessageChannel output;
    
    ...
    
        output.send(MessageBuilder.withPayload(data).build());
    

    However, it's not clear why you need this; the kafka sends are already asynchronous by default.