Search code examples
sqlcastingtimestampapache-flink

How to use ISO-8601 date in flink SQL?


Based on my research Flink SQL accepts "0000-01-01 00:00:00.000000000" as the timestamp format, but my timestamps in kafka are coming in "0000-01-01T00:00:00.000000000" format which causes by flink sql queries to fail.

Is there a way to convert, a somewhat common format (e.g. 2022-05-02T18:28:07.881414Z) for a date to the desired format in flink? In other SQL languages this would be a very simple operation, but from reading Flink documentation I cannot see a way to do it.

I tried doing a string replace and converting the string to timestamp but it did not work. I'm running Flink 1.13.

I tried using CONVERT_TZ() but it does not work with a string timestamp:

This works:

SELECT
  CONVERT_TZ(string_ts, 'UTC', 'America/Los_Angeles') as test_ts
FROM
  (VALUES ('2022-05-02 18:28:07.881414Z')) AS NameTable(string_ts);

This doesn't work:

SELECT
  CONVERT_TZ(string_ts, 'UTC', 'America/Los_Angeles') as test_ts
FROM
  (VALUES ('2022-05-02T18:28:07.881414Z')) AS NameTable(string_ts);

This gives me a timestamp but I'm not interested in changing the timezone. How can I convert the string with the replace to a timestamp that can be used with other timestamp functions? Such as timestamp diff:

SELECT
    TO_TIMESTAMP(replace(string_ts, 'T', ' ')) -- gives null
    , CONVERT_TZ(replace(string_ts, 'T', ' '), 'UTC', 'America/Los_Angeles') -- works
FROM
  (VALUES ('2022-05-02T18:28:07.881414Z')) AS NameTable(string_ts);

How can I get Flink to recognize '2022-05-02T18:28:07.881414Z' as a timestamp?


Solution

  • If you are using the JSON format to ingest this data, you can set this option

    json.timestamp-format.standard: ISO-8601

    to tell Flink how the strings are meant to be interpreted. This is described in the documentation at https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/json/#json-timestamp-format-standard.

    If you need to convert an ISO-8601 string to a timestamp using Flink SQL, you might find CONVERT_TZ(string1, string2, string3) helpful, or you might prefer to implement this conversion as a user-defined function.