Search code examples
apache-kafka-connectflink-sql

Where can transaction.timeout.ms be set in Flink SQL


I use Flink read data from Kafka using FlinkKafkaConsumer, then convert datastream to table, in the end sink data back to kafka(kafka-connector table) with FlinkSQL. In order to get exactly-once delivery guarantees, i set kafka table with property: sink.semantic=exactly-once. When do test, i got error "transaction timeout is large than the maximum value allowed by the broker". enter image description here Flink default Kafka producer max transaction timeout: 1h enter image description here kafka default setting is transaction.max.timeout.ms=900000.

So, i need to add "transaction.timeout.ms" property in kafka producer. My question is where can i add this property using FlinkSQL.

My code:

tableEnv.executeSql("INSERT INTO sink_kafka_table select * from source_table")

I have known use with table api

tableEnv.connect(new Kafka()
          .version("") 
          .topic("")
          .property("bootstrap.server","")
          .property("transaction.timeout.ms","120000"))
          .withSchema()
          .withFormat()
          .createTemporaryTable("sink_table")
 table.executeInsert("sink_table")

It's not good advice to modify kafka config file. Any advice will help, thanks advance.


Solution

  • Using the connector declaration https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#connector-tables you can use the .option method to set the properties.* option which will be forwarded to the kafka client with properties. stripped. So you'll need to set properties.transaction.max.timeout.ms

    You can also create the sink_table with an SQL DDL statement passing any configuration using the properties.* option as well: https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#properties

    I'm not familiar with how are you creating the table, but I think it was deprecated and removed in 1.14: https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/TableEnvironment.html#connect-org.apache.flink.table.descriptors.ConnectorDescriptor- the method comments recommends creating the table executing a SQL DDL statement.