Search code examples
apache-kafkaspring-kafka

KafkaHandler for multiple JSON types in Single Kafka Topic - Process in batch


TL/DR; is it possible to use separate KafkaHandlers for different JSON types in batch mode?

I have a topic I am consuming that contains several different JSON messages. I am processing the data and inserting into a database so I am processing this in batch and manually doing the Kafka commit once everything has been inserted into the database.

So I have my factory which includes

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

and the factory sets

factory.setBatchListener(true);

For testing I have two POJOs that are TypeA with strings var1 & var2 and TypeB with strings (var3 and var4). My consumer then has a method which is essentially:

@KafkaListener(topics = "${kafka.topics}");
public void receive (List<Object> data, Acknowledgement ack) {
  for (int i = 0; i < data.size(); i++) {
    Object d = data.get(i);
    if (d instanceof TypeA) {
      LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
    }
    if (d instancef TypeB) {
      LOGGER.info("We have Type B - '{}' - '{}'", b.getVar3(), a.getVar4());
    }
  }
  ack.acknowledge();
}

This works but I have been attempting to to get this to work using KafkaHandlers for each type instead of using instanceof.

If I remove the line to enable batch processing and move the KafkaListener annotation to the class level I can then create separate handlers

@KafkaHandler
public void receiveA(@Payload TypeA) {
 LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
@KafkaHandler
public void receiveB(@Payload TypeA) {
  LOGGER.info("We have Type B - '{}' - '{}'", a.getVar3(), a.getVar4());
}

This works fine but I loose the batch ability.

If I enable batch mode then it only seems to want a handler for ArrayList and you can't have separate handlers for the different types.

Is there some middle ground here? Is there any way I could processes the individual records using KafkaHandler but have something that fires once all the records have been processed by their handlers (to handle the acknowledgment and database commits) or is there a better way of handling this than in the first code using lots if instance of statements?


Solution

  • @KafkaHandler is currently not supported for batch listeners; please open a GitHub issue - we should be able to properly detect the generic list contents type from the generic parameter type.

    You might be able to use a custom BatchToRecordAdapter to call the record level listener, and set some flag in the final message in the batch to signal that it is the last.

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions-batch

    EDIT

    It doesn't make sense to support @KafkaHandler - the batch could contain mixed types.