Search code examples
apache-kafkaavroconfluent-schema-registry

kafka schema with external https connection


I'm beginning to despair of kafka. For a private project, a company sends me a kafka stream. After a long time of trying, I finally managed to connect to the bootstrap servers and receive the first messages. But not deserialized. At the moment the data format looks like this: 4868fa8▒▒▒▒_9601FLHBK053A5T▒z+B▒▒▒▒▒▒

The company sends key as well as value in avro format and I also got several schema urls. But I just can't manage to use them correctly, so that I get readable data. No matter how I enter it, it always throws an error. Only when I retrieve without any schema I get the messages back like above.

The schema urls are in https and external so I already tried to create a truststore for the url. Can someone give me a hint what else I could try?

bin/kafka-avro-console-consumer --bootstrap-server kafka1.some.url:9093,kafka2.some.url:9093,kafka3.some.url:9093 --topic myTopic --consumer-property security.protocol=SSL --consumer-property ssl.protocol=TLSv1.2 --consumer-property ssl.truststore.location=ssl/myTruststore.jks --consumer-property ssl.truststore.password=xxx --consumer-property  ssl.keystore.location=ssl/keystore.jks --consumer-property ssl.keystore.password=xxx --consumer-property ssl.key.password=xxx --consumer-property schema.registry=https://schema-reg.some.url  --from-beginning

the first question that comes up here, i was told 3 urls for the schemas. one base url which i specified in the program call and two more one for key and one for value. both have the following format:

https://schema-reg.some.url/subjects/Key/versions/latest/schema

https://schema-reg.some.url/subjects/Value/versions/latest/schema

looks like (for key):

{"type":"record","name":"WoKey","namespace":"somenameSpace","fields":[{"name":"someId","type":"string","aliases":["userId"]},{"name":"nextId","type":["null","string"],"default":null}]}

Unfortunately I am totally stumped. I was also given a typescipt, but I want to use the whole thing directly in kafka and later write it to a MySQL DB via kafka-connect sink.

If i start kafka with settings above i always get an error:

ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 423 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:333) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:114) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133) at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92) at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.net.ConnectException: Connection refused (Connection refused)


Solution

  • Since the error says "Connection Refused", I think the problem is related to --consumer-property schema.registry=

    The correct option is --property schema.registry.url=https://...

    And regarding the URLs mentioned, the default subjects would have -value or -key after the topic name.

    Otherwise, one suggestion would be to use --consumer-config with a file rather than needing to type out long command options.