Search code examples
javaapache-flinkflink-sqlflink-table-api

How to map java LocalDateTime to Flink TIMESTAMP when using table API


My code is something like:

DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
tableEnv.createTemporaryView("input_table", src, $("name"), $("dt"));

I then realized that field dt is not a TIMESTAMP after trying to call date_format on it.

Then I updated the code:

DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
RowTypeInfo rti = new RowTypeInfo(new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LOCAL_DATE_TIME}, new String[] {"name", "dt"});
SingleOutputStreamOpertor<Row> rows = src.map(val -> Row.of(val.f0, val.f1)).returns(rti);
tableEnv.createTemporaryView("input_table", rows);

New code works fine, but seems a bit twisted to me, since I have to add a map which basic does nothing.

So my question is, what is the proper way to map java LocalDateTime to flink TIMESTAMP?

I'm using Flink 1.13.0.


Solution

  • When converting the DataStream into a Table we have the opportunity to specify a org.apache.flink.table.api.Schema to tune the mapping between java types and SQL types, as well as declaring metadata like watermarks.

    This snippet works in my case:

    import java.time.LocalDateTime;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    ...
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
        DataStream<Tuple2<String, LocalDateTime>> dataStream = env.fromElements(
            Tuple2.of("Alice", LocalDateTime.parse("2021-11-16T08:19:30.123")),
            Tuple2.of("Bob", LocalDateTime.parse("2021-11-16T08:19:31.123")),
            Tuple2.of("John", LocalDateTime.parse("2021-11-16T08:19:32.123")));
    
        // note that "f0" and "f1" here come from the field names in Tuple2
        Table inputTable = tableEnv.fromDataStream(dataStream,
            Schema.newBuilder()
                .column("f0", "STRING")
                .column("f1", "TIMESTAMP(3)")
                .watermark("f1", "SOURCE_WATERMARK()")
                .build()
        );
        tableEnv.createTemporaryView("input_table", inputTable);
    
        tableEnv.executeSql("DESCRIBE input_table").print();
    
        tableEnv.executeSql("" +
            "  SELECT                                      " +
            "    UPPER(f0) AS name,                        " +
            "    f1 AS datetime,                           " +
            "    date_format(f1, 'YYYY') AS event_year     " +
            "  FROM input_table                            "
        ).print();
    

    Which prints:

    +------+------------------------+------+-----+--------+--------------------+
    | name |                   type | null | key | extras |          watermark |
    +------+------------------------+------+-----+--------+--------------------+
    |   f0 |                 STRING | true |     |        |                    |
    |   f1 | TIMESTAMP(3) *ROWTIME* | true |     |        | SOURCE_WATERMARK() |
    +------+------------------------+------+-----+--------+--------------------+
    

    and

    +----+--------------------------------+-------------------------+--------------------------------+
    | op |                           name |                datetime |                     event_year |
    +----+--------------------------------+-------------------------+--------------------------------+
    | +I |                          ALICE | 2021-11-16 08:19:30.123 |                           2021 |
    | +I |                            BOB | 2021-11-16 08:19:31.123 |                           2021 |
    | +I |                           JOHN | 2021-11-16 08:19:32.123 |                           2021 |
    +----+--------------------------------+-------------------------+--------------------------------+
    

    I find the DESCRIBE very convenient to debug those cases.

    See also here for more details: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

    Note that the conversion between DataStream and Table has been improved in 1.13 and the syntax has changed a bit. That's what they refer to in the "legacy" section of that doc. You'll probably stumble upon examples of that legacy syntax in older SO posts.

    It's probably helpful as well to check the correspondence between java types and SQL types described here: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type