How can I filter Confluent Cloud cluster audit log based on the environment while consuming from the topic: confluent-audit-log-events in spring boot application?
I'm using the camel route to consume from the audit topic.
public void configure() throws Exception {
from("kafka:confluent-audit-log-events")
.process(exchange -> {
log.info(this.KafkaDetails(exchange));
});
}
private String KafkaDetails(Exchange exchange) {
return exchange.getIn().getBody(String.class);
}
Finally, I have figured it out using the below filter before the process.
.filter(body().convertToString().contains(kafkaClusterId))