Search code examples
avroconfluent-schema-registrydebeziumflink-sqlpyflink

How to define timestamp in Flink SQL based on Kafka connector with Avro format


I have a Kafka topic that uses messages with value in Avro format with debezium types. It contains fields defined in Avro format in the following way:

{
   "name": "updated",
   "type": {
     "type": "string",
     "connect.version": 1,
     "connect.name": "io.debezium.time.ZonedTimestamp"
   }
},
{
   "name": "etl_updated",
   "type": {
      "type": "long",
      "connect.version": 1,
      "connect.name": "io.debezium.time.MicroTimestamp"
    }
}

In Pyflink application with Flink SQL API now I define them in table as STRING and BIGINT

CREATE TABLE mytable (
  ...,
  updated STRING not null,
  etl_updated BIGINT not null
) WITH (
  'connector' = 'kafka',
  'value.format' = 'avro-confluent',
  ...
)

How can I get these fields as timestamps and not just string and long?


Solution

  • You can add computed columns that use built-in temporal functions to convert these timestamps into TIMESTAMP(3) or TIMESTAMP_LTZ(3).

    https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/#temporal-functions

    E.g., you could do this:

    CREATE TABLE mytable (
      ...,
      updated STRING not null,
      etl_updated BIGINT not null,
      etl_updated_ltz AS TO_TIMESTAMP_LTZ(etl_updated/1000, 3),
      WATERMARK FOR etl_updated_ltz AS etl_updated_ltz - INTERVAL '10' SECOND
    ) WITH (
      'connector' = 'kafka',
      'value.format' = 'avro-confluent',
      ...
    );