Search code examples
apache-beamavroconfluent-schema-registryapache-beam-ioapache-beam-kafkaio

How to process Avro input from Kafka (with Apache Beam) when there are multiple subjects on one topic?


In order to process Avro-encoded messages with Apache Beam using KafkaIO, one needs to pass an instance of ConfluentSchemaRegistryDeserializerProvider as the value deserializer.

A typical example looks like this:

PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
  .apply(KafkaIO.<Long, GenericRecord>read()
     .withBootstrapServers("kafka-broker:9092")
     .withTopic("my_topic")
     .withKeyDeserializer(LongDeserializer.class)
     .withValueDeserializer(
         ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))

However, some of the Kafka topics, that I want to consume, have multiple different subjects (event types) on them (for ordering reasons). Thus, I can't provide one fixed subject name in advance. How can this dilemma be solved?

(My goal is to, in the end, use BigQueryIO to push these events to the cloud.)


Solution

  • You could do multiple reads, one per subject, and then Flatten them.