I am learning Flink and I started with a simple word count using DataStream. To enhance the processing I filtered the output to show only the results with 3 or more words found.
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9000)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new MyWindowFunction())
.sum(1)
.filter(word -> word.f1 >= 3);
I would like to create a WindowFunction to sort the output by the value of words found. The WindowFunction that I am trying to implement does not compile at all. I am struggling to define the apply method and the parameters of the WindowFunction interface.
public static class MyWindowFunction implements WindowFunction<
Tuple2<String, Integer>, // input type
Tuple2<String, Integer>, // output type
Tuple2<String, Integer>, // key type
TimeWindow> {
void apply(Tuple2<String, Integer> key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
String word = ((Tuple2<String, Integer>)key).f0;
Integer count = ((Tuple2<String, Integer>)key).f1;
.........
out.collect(new Tuple2<>(word, count));
}
}
I am updating this answer to use Flink 1.12.0. In order to sort the elements of a stream in I had to use a KeyedProcessFunction
after counting the stream with a ReduceFunction
. Then I had to set the parallelism of the very last transformation to 1
in order to not change the order of the elements that I sorted using KeyedProcessFunction
. The sequence that I am using is socketTextStream
-> flatMap
-> keyBy
-> reduce
-> keyBy
-> process
-> print().setParallelism(1)
. Bellow it the example:
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.keyBy(new WordKeySelector())
.reduce(new SumReducer())
.keyBy(new WordKeySelector())
.process(new SortKeyedProcessFunction(3 * 1000))
.print().setParallelism(1);
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("Window WordCount sorted");
}
}
The UDF that I used to sort the stream is the SortKeyedProcessFunction
which extends KeyedProcessFunction
. I use a ValueState<List<Event>> listState
of Event implements Comparable<Event>
to have a sorted list as state. On the processElement
method I register the time stamp that I added the event to the state context.timerService().registerProcessingTimeTimer(timeoutTime);
and I collect the event at the onTimer
method. I am also using a time window of 3 seconds here.
public class SortKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Event> {
private static final long serialVersionUID = 7289761960983988878L;
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private ValueState<List<Event>> listState = null;
private ValueState<Long> lastTime = null;
public SortKeyedProcessFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer and HLL state
ValueStateDescriptor<List<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<List<Event>>() {
}));
listState = getRuntimeContext().getState(descriptor);
ValueStateDescriptor<Long> descriptorLastTime = new ValueStateDescriptor<Long>(
"lastTime",
TypeInformation.of(new TypeHint<Long>() {
}));
lastTime = getRuntimeContext().getState(descriptorLastTime);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context context, Collector<Event> collector) throws Exception {
// get current time and compute timeout time
long currentTime = context.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
context.timerService().registerProcessingTimeTimer(timeoutTime);
List<Event> queue = listState.value();
if (queue == null) {
queue = new ArrayList<Event>();
}
Long current = lastTime.value();
queue.add(new Event(value.f0, value.f1));
lastTime.update(timeoutTime);
listState.update(queue);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {
// System.out.println("onTimer: " + timestamp);
// check if this was the last timer we registered
System.out.println("timestamp: " + timestamp);
List<Event> queue = listState.value();
Long current = lastTime.value();
if (timestamp == current.longValue()) {
Collections.sort(queue);
queue.forEach( e -> {
out.collect(e);
});
queue.clear();
listState.clear();
}
}
}
class Event implements Comparable<Event> {
String value;
Integer qtd;
public Event(String value, Integer qtd) {
this.value = value;
this.qtd = qtd;
}
public String getValue() { return value; }
public Integer getQtd() { return qtd; }
@Override
public String toString() {
return "Event{" +"value='" + value + '\'' +", qtd=" + qtd +'}';
}
@Override
public int compareTo(@NotNull Event event) {
return this.getValue().compareTo(event.getValue());
}
}
So when I use $ nc -lk 9000
and type the words on the console I see them in order on the output
...
Event{value='soccer', qtd=7}
Event{value='swim', qtd=5}
...
Event{value='basketball', qtd=9}
Event{value='soccer', qtd=8}
Event{value='swim', qtd=6}
The other UDFs are for the other transformations of the stream program and they are here for completeness.
public class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}
}
public class WordKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
public class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> event1, Tuple2<String, Integer> event2) throws Exception {
return Tuple2.of(event1.f0, event1.f1 + event2.f1);
}
}