Search code examples
scalaapache-kafkaapache-beamavro

How to pass parameters to an avro deserializer in apache beam (KafkaIO)?


I need to download the AVRO schema at runtime and I need to pass the bootstrap servers and the kafka topic to resolve the correct schema but I cannot find a way to pass those parameters on the deserializer(except hardcoding them). Do you have any idea on how to do this?

    val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
    ops.setKafkaTopic(pars.kafkaTopic)
    ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
    ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
    val p = Pipeline.create(ops)

    p.apply( KafkaIO.read<String, Measurement>()
            .withTopic(pars.kafkaTopic)
            .withBootstrapServers(pars.kafkaBootstrapServers)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializer(RemoteAvroDeserializer::class.java)
            .withoutMetadata()
    )
            .apply(Values.create())
            (TransformToMeasurementFN()))
            .apply(
                    Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
            .apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
            .apply(Count.perElement())
            .apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))

    p.run()

This is my deserializer:

class RemoteAvroDeserializer : Deserializer<Measurement> {
 val decoder: BinaryMessageDecoder<Measurement>

 public constructor() {
     val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
     decoder = Measurement.createDecoder(schemaStore)
 }

 override fun deserialize(s: String, bytes: ByteArray): Measurement {
     return decoder.decode(bytes)
 }

 override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
 }

 override fun close() {
 }
}

Solution

  • According to the Beam documentation, you can set consumer configuration like so

      KafkaIO... 
    .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
    

    I assume you can just add schema.registry.url, or whatever, here