Search code examples
javaapache-kafkaavroapache-kafka-streamsconfluent-schema-registry

Kafka Stream with Avro in JAVA , schema.registry.url" which has no default value


I have the following configuration for my Kafka Stream application

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());

    // Exactly once processing!!
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

And I got the following error:

Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)

I have tried to replace the line

config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

with

config.put("schema.registry.url","http://localhost:8081");

but with the same error

I have followed the instruction from this url when preparing my Stream application.

Any suggestion?


Solution

  • If you have keys and values in Avro format the following lines should do the trick for you,

    config.put("key.converter.schema.registry.url", "http://localhost:8081");  
    config.put("value.converter.schema.registry.url", "http://localhost:8081");
    

    If this doesn't seem to work you can override Serdes explicitly. For example, if you have Avro keys:

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                     "http://localhost:8081");
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
    keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
    valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
    
    StreamsBuilder builder = new StreamsBuilder();
    KStream<GenericRecord, GenericRecord> textLines =
    builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
    // Do whatever you like