I use Flink read data from Kafka using FlinkKafkaConsumer, then convert datastream to table, in the end sink data back to kafka(kafka-connector table) with FlinkSQL. In order to get exactly-once delivery guarantees, i set kafka table with property: sink.semantic=exactly-once.
When do test, i got error "transaction timeout is large than the maximum value allowed by the broker".
Flink default Kafka producer max transaction timeout: 1h
kafka default setting is transaction.max.timeout.ms=900000.
So, i need to add "transaction.timeout.ms" property in kafka producer. My question is where can i add this property using FlinkSQL.
My code:
tableEnv.executeSql("INSERT INTO sink_kafka_table select * from source_table")
I have known use with table api
tableEnv.connect(new Kafka()
.version("")
.topic("")
.property("bootstrap.server","")
.property("transaction.timeout.ms","120000"))
.withSchema()
.withFormat()
.createTemporaryTable("sink_table")
table.executeInsert("sink_table")
It's not good advice to modify kafka config file. Any advice will help, thanks advance.
Using the connector declaration https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#connector-tables you can use the .option
method to set the properties.*
option which will be forwarded to the kafka client with properties.
stripped. So you'll need to set properties.transaction.max.timeout.ms
You can also create the sink_table
with an SQL DDL statement passing any configuration using the properties.*
option as well: https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#properties
I'm not familiar with how are you creating the table, but I think it was deprecated and removed in 1.14: https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/TableEnvironment.html#connect-org.apache.flink.table.descriptors.ConnectorDescriptor- the method comments recommends creating the table executing a SQL DDL statement.