Search code examples
apache-camelspring-camel

Apache Camel with Kafka Schema registry


I am building a Camel application to read message from Confluent Kafka. The messages are in Avro format and added below route configuration to read the Avro messages using schema registry in Camel route. When I enable the valueDeserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer, I am not getting any messages from Kafka topic. I tested the route with out schema registry and able to consume the message.

Route definition:

from("kafka:topic1?sslTruststoreLocation=<jks file>
&valueDeserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
&brokers=host1:9092,host2:9092,host3:9092
&sslKeystoreType=JKS
&groupId=grp1
&allowManualCommit=true
&consumersCount=10
&sslKeyPassword=<password>
&autoOffsetReset=earliest
&sslKeystorePassword=<password>
&securityProtocol=SSL
&sslTruststorePassword=<password>
&sslEndpointAlgorithm=HTTPS
&maxPollRecords=10
&sslTruststoreType=JKS
&sslKeystoreLocation=<keystore_path>
&autoCommitEnable=false
&additionalProperties.schema.registry.url=https://localhost:8081
&additionalProperties.basic.auth.user.info=abc:xyz
&additionalProperties.basic.auth.credentials.source=USER_INFO");

Can you please let me know, what is wrong in above configuration for schema registry. I also tried with EndPointRouteBuilder and same issue. However the producer application which is also Camel based and uses the schema registry for publishing Avro messages is working fine.


Solution

  • I figured out the way to configure the basic auth with Confluent schema registry. We need to configure as below

    from("kafka:topic1?sslTruststoreLocation=<jks file>
    &valueDeserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
    &brokers=host1:9092,host2:9092,host3:9092
    &sslKeystoreType=JKS
    &groupId=grp1
    &allowManualCommit=true
    &consumersCount=10
    &sslKeyPassword=<password>
    &autoOffsetReset=earliest
    &sslKeystorePassword=<password>
    &securityProtocol=SSL
    &sslTruststorePassword=<password>
    &sslEndpointAlgorithm=HTTPS
    &maxPollRecords=10
    &sslTruststoreType=JKS
    &sslKeystoreLocation=<keystore_path>
    &autoCommitEnable=false
    &additionalProperties.schema.registry.url=https://localhost:8081
    &additional-properties[basic.auth.user.info]=abc:xyz
    &additional-properties[basic.auth.credentials.source]=USER_INFO");
    

    Note here, we need to use additional-properties for basic.auth.user.info and basic.auth.credentials.source as mentioned above.