Search code examples
apache-flink

Flink AggregateFunction find sum by multiple keys( validation process and testing)


I am using Apache flink on Kinesis Data Analytics.

Flink Version : 1.13.2 Jave : 1.11

I am consuming json messages from kafka. Sample Input records look as below

null    {"plateNumber":"506b9910-74a7-4c3e-a885-b5e9717efe3a","vignetteStickerId":"9e69df3f-d728-4fc8-9b09-42104588f772","currentTimestamp":"2022/04/07 16:19:55","timestamp":1649362795.444459000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null    {"plateNumber":"5ffe0326-571e-4b97-8f7b-4f49aebb6993","vignetteStickerId":"6c2e1342-b096-4cc9-a92c-df61571c2c7d","currentTimestamp":"2022/04/07 16:20:00","timestamp":1649362800.638060000,"vehicleType":"CAR","vehicleModelType":"HONDA"}
null    {"plateNumber":"d15f49f9-5550-4780-b260-83f3116ba64a","vignetteStickerId":"1366fbfe-7d0a-475f-9249-261ef1dd6de2","currentTimestamp":"2022/04/07 16:20:05","timestamp":1649362805.643749000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null    {"plateNumber":"803508fb-9701-438e-9028-01bb8d96a804","vignetteStickerId":"b534369f-533e-4c15-ac3f-fc28cf0f3aba","currentTimestamp":"2022/04/07 16:20:10","timestamp":1649362810.648813000,"vehicleType":"CAR","vehicleModelType":"FORD"}

I want to aggregate sum these records into 20 seconds window using vehicleType (CAR OR TRUCK) and vehicleModelType (TOYOTA,HONDA or FORD) . SQL Analogy (sum() ,Group by vehicleType, vehicleModelType)

I am using Aggregate function to achieve this.

import static java.util.Objects.isNull;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.springframework.stereotype.Component;

import com.helecloud.streams.demo.model.Vehicle;
import com.helecloud.streams.demo.model.VehicleStatistics;

@Component
public class VehicleStatisticsAggregator implements AggregateFunction<Vehicle, VehicleStatistics, VehicleStatistics> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public VehicleStatistics createAccumulator() {
        System.out.println("Creating Accumulator!!");
        return new VehicleStatistics();
    }

    @Override
    public VehicleStatistics add(Vehicle vehicle, VehicleStatistics vehicleStatistics) {

        System.out.println("vehicle in add method : " + vehicle);

        if (isNull(vehicleStatistics.getVehicleType())) {
            vehicleStatistics.setVehicleType(vehicle.getVehicleType());
        }

        if (isNull(vehicleStatistics.getVehicleModelType())) {
            vehicleStatistics.setVehicleModelType(vehicle.getVehicleModelType());
        }

//    if(isNull(vehicleStatistics.getStart())) {
//
//      vehicleStatistics.setStart(vehicle.getTimestamp());
//    }

//    if(isNull(vehicleStatistics.getCurrentTimestamp())) {
//
//        vehicleStatistics.setCurrentTimestamp(vehicle.getCurrentTimestamp());
//      }

        if (isNull(vehicleStatistics.getCount())) {

            vehicleStatistics.setCount(1);
        } else {

            System.out.println("incrementing count for : vehicleStatistics : " + vehicleStatistics);
            vehicleStatistics.setCount(vehicleStatistics.getCount() + 1);
        }

        vehicleStatistics.setEnd(vehicle.getTimestamp());

        System.out.println("vehicleStatistics in add : " + vehicleStatistics);

        return vehicleStatistics;
    }

    @Override
    public VehicleStatistics getResult(VehicleStatistics vehicleStatistics) {
        System.out.println("vehicleStatistics in getResult : " + vehicleStatistics);
        return vehicleStatistics;
    }

    @Override
    public VehicleStatistics merge(VehicleStatistics vehicleStatistics, VehicleStatistics accumulator) {

        System.out.println("Coming to merge!!");

        VehicleStatistics vs = new VehicleStatistics(
                // vehicleStatistics.getStart(),
                accumulator.getEnd(),
                // vehicleStatistics.getCurrentTimestamp(),
                vehicleStatistics.getVehicleType(), vehicleStatistics.getVehicleModelType(),
                vehicleStatistics.getCount() + accumulator.getCount());

        System.out.println("VehicleStatistics in Merge :" + vs);

        return vs;

    }
}

In the above code I am also not seeing the merge code being called. Below is the main processing code

@Service
public class ProcessingService {

  @Value("${kafka.bootstrap-servers}")
  private String kafkaAddress;

  @Value("${kafka.group-id}")
  private String kafkaGroupId;

  public static final String TOPIC = "flink_input";

  public static final String VEHICLE_STATISTICS_TOPIC = "flink_output";

  @Autowired
  private  VehicleDeserializationSchema vehicleDeserializationSchema;

  @Autowired
  private  VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;

  @PostConstruct
  public void startFlinkStreamProcessing() {
    try {

      processVehicleStatistic();
    } catch (Exception e) {

     // log.error("Cannot process", e);
        e.printStackTrace();
    }
  }

  public void processVehicleStatistic()  {
      
      
    try {
        
         StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

            FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);

            consumer.setStartFromLatest();
            
            System.out.println("Starting to consume!!");

            consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

            FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);

            DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);
            
            

            inputMessagesStream
            .keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
          //  .keyBy(vehicle -> vehicle.getVehicleModelType().ordinal())
//              .keyBy(new KeySelector<Vehicle, Tuple2<VehicleType, VehicleModelType>>() {
//      
//                  /**
//                   * 
//                   */
//                  private static final long serialVersionUID = 1L;
//      
//                  @Override
//                  public Tuple2<VehicleType, VehicleModelType> getKey(Vehicle vehicle) throws Exception {
//                    return Tuple2.of(vehicle.getVehicleType(), vehicle.getVehicleModelType());
//                  }
//                })
               
//              .filter(v -> CAR.equals(v.getVehicleType()))
                .window(TumblingEventTimeWindows.of(Time.seconds(20)))
//              .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new VehicleStatisticsAggregator())
             
            .addSink(producer);
            
            System.out.println("Adding to Sink!!");

            environment.execute("Car Truck Counts By Model");

        
    } catch(Exception e) {
        e.printStackTrace();;
    }

  }

  private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", kafkaAddress);

    properties.setProperty("group.id", kafkaGroup);

    return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);

  }

  private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){

    return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
  }

}

I am getting the result as below.

null    {"end":1649362835.665466000,"vehicleType":"TRUCK","vehicleModelType":"HONDA","count":3}
null    {"end":1649362825.656024000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":1}
null    {"end":1649362850.675786000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":3}
null    {"end":1649362855.677596000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA","count":1}

But is there a way to validate this ?

Also other question is I am trying to aggregate the result based on multiple keys is AggregateFunction the correct way to do this.

I am asking this as I saw this How can I sum multiple fields in Flink?

So If I have to aggregate sum on multiple fields can aggregate function accomplish the same ?(the way I wrote the code)

Kindly let me know. Thanks in advance.


Solution

  • Merge will only be called if you are using windows that merge -- in other words, if you are using session windows, or a custom merging window.

    The correct way to aggregate based on multiple keys is to use keyBy with a composite type, such as Tuple2<VehicleType, VehicleModelType>>. Each time you call keyBy the stream is repartitioned from scratch (and not in addition to any previous partitioning).