Search code examples
javaapache-kafkaapache-flink

How to support multiple KeyBy in Flink


In code sample below, I am trying to get a stream of employee records { Country, Employer, Name, Salary, Age } and dumping highest paid employee in every country. Unfortunately Multiple KEY By doesn't work.

Only KeyBy(Employer) is reflecting, thus I don't get correct result. What am I missing?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Employee> streamEmployee = env.addSource(
        new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
        .map(new MapFunction<ObjectNode, Employee>() {

            private static final long serialVersionUID = 6111226274068863916L;

            @Override
            public Employee map(ObjectNode value) throws Exception {
                final Gson gson = new GsonBuilder().create();
                Employee uMsg = gson.fromJson(value.toString(), Employee.class);
                return uMsg;
            }
        });

KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
        .keyBy(new KeySelector<Employee, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String getKey(Employee value) throws Exception {
                // TODO Auto-generated method stub
                return value.getCountry();
            }
        }).keyBy(new KeySelector<Employee, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String getKey(Employee value) throws Exception {
                // TODO Auto-generated method stub
                return value.getEmployer();
            }
        });
// This should display employees highly paid in a given country , for a
// given employer
DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
        .maxBy("salary");

// Assume toString() is overridden , so print works well.
uHighlyPaidEmployee.print();

env.execute("Employee-employer log processor");

Solution

  • You can define a KeySelector that returns a composite key:

    KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
      streamEmployee.keyBy(
        new KeySelector<Employee, Tuple2<String, String>>() {
    
          @Override
          public Tuple2<String, String> getKey(Employee value) throws Exception {
            return Tuple2.of(value.getCountry(), value.getEmployer());
          }
        }
      );