Search code examples
javaspringapache-kafkaspring-kafkakafka-producer-api

Execute a method Kafka Callback


I have a RestController which calls KafkaSendMethod() to send message to Kafka MQ

 @RestController
@RequestMapping("string-rest")
public class SimpleBookRestController {

   @Autowire
    private KafkaSendClass kafkaSendClass;
    
    @GetMapping( produces = "application/json")
    public void postMethod() {
    
    ArrayList<String> gfg = new ArrayList<String>(); 

    gfg.add("a"); 
    gfg.add("b"); 
    gfg.add("c");
    
    kafkaSendClass.KafkaSendMethod(gfg);
    
    }

Here is the class which sends message to Kafka

@Service
    class KafkaSendClass
    {
    
    @Autowired
private KafkaTemplate<String, String> kafkaTemplate;


    void KafkaSendMethod(List<Strings> strList)
    {
    
    for ( String str : strList )
    (
    ListenableFuture<SendResult<String,String>>   future = kafkaTemplate.send(str , str );
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
        @Override
        public void onSuccess(SendResult<String, String> result) {
            syso("sent success");
            m1(); //call only once for one call to KafkaSendMethod() ;
        }
    
        @Override
        public void onFailure(Throwable ex) {
            System.out.println(" sending failed");
        }
    });

); So my question is how do i call m1() once for every call to KafkaSendMethod(List strList) call.


Solution

  • See if you can use a CompletableFuture.allOf(...).

    Gather those Futures into a list and use their completable() adaptation. Then call List.toArray() for that CompletableFuture. The CompletableToListenableFutureAdapter to add your final callback.