I have a simple stream execution configured as:
val config: Configuration = new Configuration()
config.setString("taskmanager.memory.managed.size", "4g")
config.setString("parallelism.default", "4")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env
.fromSource(KafkaSource.builder[String]
.setBootstrapServers("node1:9093,node2:9093,node3:9093")
.setTopics("example-topic")
//.setProperties(kafkaProps) // didn't work
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "GSSAPI")
.setProperty("sasl.kerberos.service.name", "kafka")
.setProperty("group.id","groupid-test")
//.setGroupId("groupid-test") // didn't work
.setStartingOffsets(OffsetsInitializer.earliest)
.setProperty("partition.discovery.interval.ms", "60000") // discover part
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
.build,
WatermarkStrategy.noWatermarks[String],
"example-input-topic"
)
.print
env.execute("asdasd")
My flink version is: 1.14.2
My kafka is running on cloudera. Kafka version: 2.2.1-cdh6.3.2
Am able to consume records from Kafka. But it doesnt set groupid for topic. Does anyone has any ideas?
Since Flink 1.14.0, the group.id is an optional value. See https://issues.apache.org/jira/browse/FLINK-24051. You can set your own value if you want to specify one. You can see from the accompanying PR how this was previously set at https://github.com/apache/flink/pull/17052/files#diff-34b4ff8d43271eeac91ba17f29b13322f6e0ff3d15f71003a839aeb780fe30fbL56