Search code examples
sortingapache-flinkflink-streaming

Sorting DataStream using Apache Flink


I am learning Flink and I started with a simple word count using DataStream. To enhance the processing I filtered the output to show only the results with 3 or more words found.

    DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9000)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .apply(new MyWindowFunction())
            .sum(1)
            .filter(word -> word.f1 >= 3);

I would like to create a WindowFunction to sort the output by the value of words found. The WindowFunction that I am trying to implement does not compile at all. I am struggling to define the apply method and the parameters of the WindowFunction interface.

public static class MyWindowFunction implements WindowFunction<
        Tuple2<String, Integer>, // input type
        Tuple2<String, Integer>, // output type
        Tuple2<String, Integer>, // key type
        TimeWindow> {

    void apply(Tuple2<String, Integer> key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {

        String word = ((Tuple2<String, Integer>)key).f0;
        Integer count = ((Tuple2<String, Integer>)key).f1;

        .........
        out.collect(new Tuple2<>(word, count));
    }
}

Solution

  • I am updating this answer to use Flink 1.12.0. In order to sort the elements of a stream in I had to use a KeyedProcessFunction after counting the stream with a ReduceFunction. Then I had to set the parallelism of the very last transformation to 1 in order to not change the order of the elements that I sorted using KeyedProcessFunction. The sequence that I am using is socketTextStream -> flatMap -> keyBy -> reduce -> keyBy -> process -> print().setParallelism(1). Bellow it the example:

    public class SocketWindowWordCountJava {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.socketTextStream("localhost", 9000)
                    .flatMap(new SplitterFlatMap())
                    .keyBy(new WordKeySelector())
                    .reduce(new SumReducer())
                    .keyBy(new WordKeySelector())
                    .process(new SortKeyedProcessFunction(3 * 1000))
                    .print().setParallelism(1);
            String executionPlan = env.getExecutionPlan();
            System.out.println("ExecutionPlan ........................ ");
            System.out.println(executionPlan);
            System.out.println("........................ ");
            env.execute("Window WordCount sorted");
        }
    }
    

    The UDF that I used to sort the stream is the SortKeyedProcessFunction which extends KeyedProcessFunction. I use a ValueState<List<Event>> listState of Event implements Comparable<Event> to have a sorted list as state. On the processElement method I register the time stamp that I added the event to the state context.timerService().registerProcessingTimeTimer(timeoutTime); and I collect the event at the onTimer method. I am also using a time window of 3 seconds here.

    public class SortKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Event> {
            private static final long serialVersionUID = 7289761960983988878L;
            // delay after which an alert flag is thrown
            private final long timeOut;
            // state to remember the last timer set
            private ValueState<List<Event>> listState = null;
            private ValueState<Long> lastTime = null;
    
            public SortKeyedProcessFunction(long timeOut) {
                this.timeOut = timeOut;
            }
    
            @Override
            public void open(Configuration conf) {
                // setup timer and HLL state
                ValueStateDescriptor<List<Event>> descriptor = new ValueStateDescriptor<>(
                        // state name
                        "sorted-events",
                        // type information of state
                        TypeInformation.of(new TypeHint<List<Event>>() {
                        }));
                listState = getRuntimeContext().getState(descriptor);
    
                ValueStateDescriptor<Long> descriptorLastTime = new ValueStateDescriptor<Long>(
                        "lastTime",
                        TypeInformation.of(new TypeHint<Long>() {
                        }));
    
                lastTime = getRuntimeContext().getState(descriptorLastTime);
            }
    
            @Override
            public void processElement(Tuple2<String, Integer> value, Context context, Collector<Event> collector) throws Exception {
                // get current time and compute timeout time
                long currentTime = context.timerService().currentProcessingTime();
                long timeoutTime = currentTime + timeOut;
                // register timer for timeout time
                context.timerService().registerProcessingTimeTimer(timeoutTime);
    
                List<Event> queue = listState.value();
                if (queue == null) {
                    queue = new ArrayList<Event>();
                }
                Long current = lastTime.value();
                queue.add(new Event(value.f0, value.f1));
                lastTime.update(timeoutTime);
                listState.update(queue);
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {
                // System.out.println("onTimer: " + timestamp);
                // check if this was the last timer we registered
                System.out.println("timestamp: " + timestamp);
                List<Event> queue = listState.value();
                Long current = lastTime.value();
    
                if (timestamp == current.longValue()) {
                    Collections.sort(queue);
    
                    queue.forEach( e -> {
                        out.collect(e);
                    });
                    queue.clear();
                    listState.clear();
                }
            }
        }
    
    class Event implements Comparable<Event> {
        String value;
        Integer qtd;
        public Event(String value, Integer qtd) {
            this.value = value;
            this.qtd = qtd;
        }
        public String getValue() { return value; }
        public Integer getQtd() { return qtd; }
        @Override
        public String toString() {
            return "Event{" +"value='" + value + '\'' +", qtd=" + qtd +'}';
        }
        @Override
        public int compareTo(@NotNull Event event) {
            return this.getValue().compareTo(event.getValue());
        }
    }
    

    So when I use $ nc -lk 9000 and type the words on the console I see them in order on the output

    ...
    Event{value='soccer', qtd=7}
    Event{value='swim', qtd=5}
    
    ...
    Event{value='basketball', qtd=9}
    Event{value='soccer', qtd=8}
    Event{value='swim', qtd=6}
    

    The other UDFs are for the other transformations of the stream program and they are here for completeness.

    public class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
            private static final long serialVersionUID = 3121588720675797629L;
            @Override
            public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word : sentence.split(" ")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }
        public class WordKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }
        public class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> event1, Tuple2<String, Integer> event2) throws Exception {
                return Tuple2.of(event1.f0, event1.f1 + event2.f1);
            }
        }