Search code examples
apache-kafkastreamingapache-flinkflink-streaming

Apache Flink KafkaSource doesnt set group.id


I have a simple stream execution configured as:

val config: Configuration = new Configuration()
config.setString("taskmanager.memory.managed.size", "4g")
config.setString("parallelism.default", "4")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)

env
  .fromSource(KafkaSource.builder[String]
    .setBootstrapServers("node1:9093,node2:9093,node3:9093")
    .setTopics("example-topic")
    //.setProperties(kafkaProps) // didn't work
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "GSSAPI")
    .setProperty("sasl.kerberos.service.name", "kafka")
    .setProperty("group.id","groupid-test")
    //.setGroupId("groupid-test") // didn't work
    .setStartingOffsets(OffsetsInitializer.earliest)
    .setProperty("partition.discovery.interval.ms", "60000") // discover part
    .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
    .build,
    WatermarkStrategy.noWatermarks[String],
    "example-input-topic"
  )
  .print

env.execute("asdasd")

My flink version is: 1.14.2

My kafka is running on cloudera. Kafka version: 2.2.1-cdh6.3.2

Am able to consume records from Kafka. But it doesnt set groupid for topic. Does anyone has any ideas?


Solution

  • Since Flink 1.14.0, the group.id is an optional value. See https://issues.apache.org/jira/browse/FLINK-24051. You can set your own value if you want to specify one. You can see from the accompanying PR how this was previously set at https://github.com/apache/flink/pull/17052/files#diff-34b4ff8d43271eeac91ba17f29b13322f6e0ff3d15f71003a839aeb780fe30fbL56