I have this Java code in Flink:
env.setParallelism(6);
//Read from Kafka topic with 12 partitions
DataStream<String> line = env.addSource(myConsumer);
//Filter half of the records
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder());
//Filter the other half
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder());
//Join all the data again
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2);
//Window
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U
.keyBy(1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new Reducer());
The problem is that the window should be able to process with parallelism = 2 as there are two diferent groups of data with keys "odd" and "even" in the second String in the Tuple3. Everything is running with parallelism 6 but not the window which is running with parallelism = 1 and I just need it to have parallelism = 2 because of my requirements.
The functions used in the code are the following:
public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
public boolean filter(Tuple2<String, Integer> line) throws Exception {
Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0;
return isOdd;
}
};
public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
public boolean filter(Tuple2<String, Integer> line) throws Exception {
Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0;
return isEven;
}
};
public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1);
return newLine;
}
};
public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1);
return newLine;
}
};
public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1,
Tuple3<String, String, Integer> line2) throws Exception {
Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]);
Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]);
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) +
" " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2);
return newLine;
}
};
Thanks for your help!
SOLUTION: I have changed the content of the keys from "odd" and "even" to "odd0000" and "even1111" and it is working properly now.
Keys are distributed to worker threads by hash partitioning. This means that the key values are hashed and the thread is determined by modulo #workers. With two keys and two threads there is a good chance that both keys are assigned to the same thread.
You can try to use different key values whose hash values distribute across both threads.