Search code examples
javaapache-kafkaapache-camelconfluent-cloud

Filter Confluent Cloud "confluent-audit-log-events" topic


How can I filter Confluent Cloud cluster audit log based on the environment while consuming from the topic: confluent-audit-log-events in spring boot application?

I'm using the camel route to consume from the audit topic.

public void configure() throws Exception {

        from("kafka:confluent-audit-log-events")
                .process(exchange -> {
                    log.info(this.KafkaDetails(exchange));
                });
    }

    private String KafkaDetails(Exchange exchange) {
        return exchange.getIn().getBody(String.class);
    }


Solution

  • Finally, I have figured it out using the below filter before the process.

    .filter(body().convertToString().contains(kafkaClusterId))