I have a Kafka topic that uses messages with value in Avro format with debezium types. It contains fields defined in Avro format in the following way:
{
"name": "updated",
"type": {
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
},
{
"name": "etl_updated",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.MicroTimestamp"
}
}
In Pyflink application with Flink SQL API now I define them in table as STRING and BIGINT
CREATE TABLE mytable (
...,
updated STRING not null,
etl_updated BIGINT not null
) WITH (
'connector' = 'kafka',
'value.format' = 'avro-confluent',
...
)
How can I get these fields as timestamps and not just string and long?
You can add computed columns that use built-in temporal functions to convert these timestamps into TIMESTAMP(3) or TIMESTAMP_LTZ(3).
E.g., you could do this:
CREATE TABLE mytable (
...,
updated STRING not null,
etl_updated BIGINT not null,
etl_updated_ltz AS TO_TIMESTAMP_LTZ(etl_updated/1000, 3),
WATERMARK FOR etl_updated_ltz AS etl_updated_ltz - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'value.format' = 'avro-confluent',
...
);