Search code examples
javaapache-beamvalue-providerapache-beam-kafkaio

KafkaIO withBootStrapServers


I am trying to get a server ID as a parameter while executing the run command using ValueProvider

For the Value Provider in Options Interface:

ValueProvider<String> getKafkaServer();
    void setKafkaServer(ValueProvider<String> value);

withBootstrapServers throws the error "incompatible types: org.apache.beam.sdk.options.ValueProvider cannot be converted to java.lang.String"

PCollection<String> kafkaMessages = p
                .apply("Read from Kafka", KafkaIO.<Long, String>read()
                        .withBootstrapServers(options.getBootstrapServers())

This answer suggests to use options.getBootstrapServers().get() but that generates the following error

[ERROR] - Expected getter for property [getbootstrapServers] of type [org.apache.beam.sdk.options.ValueProvider]
[ERROR] - Expected setter for property [bootstrapServers] of type [org.apache.beam.sdk.options.ValueProvider]

Any help to resolve this is much appreciated


Solution

  • ValueProviders only work for transforms (including sources) that have plumbed support for them. They are only needed for legacy Dataflow templates. Generally one can use concrete values for pipeline options (e.g. String getKafkaServer()) which can be passed at pipeline startup. If one wants to defer this selection to template instantiation time, use flex templates.