Search code examples
apache-flinkflink-streamingflink-sql

How to sort a stream by event time using Flink SQL


I have an out-of-order DataStream<Event> that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp field:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

    sortedEventStream.print();

    env.execute();
}

I'm getting this error:

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "timestamp FROM" at line 1, column 8.

Seems like I'm not specifying the event time attribute in the correct way, but it's not clear what's wrong.


Solution

  • The problem turned out to be using timestamp as a field name in my Event class. Changing it to eventTime was enough to get everything working:

    public class Sort {
        public static final int OUT_OF_ORDERNESS = 1000;
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                    .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
    
            Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
            tableEnv.registerTable("events", events);
            Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
            DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
    
            sortedEventStream.print();
    
            env.execute();
        }
    
        public static class Event {
            public Long eventTime;
    
            Event() {
                this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
            }
        }
    
        private static class OutOfOrderEventSource implements SourceFunction<Event> {
            private volatile boolean running = true;
    
            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                while(running) {
                    ctx.collect(new Event());
                    Thread.sleep(1);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    
        private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
            public TimestampsAndWatermarks() {
                super(Time.milliseconds(OUT_OF_ORDERNESS));
            }
    
            @Override
            public long extractTimestamp(Event event) {
                return event.eventTime;
            }
        }
    }