Search code examples
apache-flinkpyflinkamazon-kinesis

PyFlink unix epoch timestamp conversion issue


I have events coming in with unix epoch timestamp, I am using a table with Kinesis connector for source table. I need to use the same timestamp field as the watermark. How do I do this in python? I am using Flink-1.11 release as thats the latest AWS supports.

events format: {'event_time': 1633098843692, 'ticker': 'AMZN'}

Python table:

CREATE TABLE event_input_table (
            event_time TIMESTAMP,
            ticker VARCHAR,
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
          )
          WITH (
            'connector' = 'kinesis',
            'stream' = 'inputstream1',
            'aws.region' = 'us-east-1',
            'scan.stream.initpos' = 'TRIM_HORIZON',
            'format' = 'json' ,
            'aws.credentials.provider' = 'ENV_VAR' 
            )

Solution

  •  CREATE TABLE event_input_table (
                    event_time BIGINT,
                    ip_src VARCHAR,
                    ip_dst VARCHAR,
                    domain ARRAY<VARCHAR>,
                    new_time as TO_TIMESTAMP(FROM_UNIXTIME(event_time))
                  )