Search code examples
apache-flinkflink-streaming

Process consecutive messages


I have a stream of object coordinates (time, x, y), I want to transform it into a stream of distances and then to a stream of speeds. To do it I need to process two consecutive messages each time.

Can you give me an idea on how to create a stream of Euclidean distances between points?


Solution

  • DataStream<Distance> distances = yourCoordinateSource
                .windowAll(GlobalWindows.create())
                .trigger(PurgingTrigger.of(CountTrigger.of(2)))
                .process(new DistanceFunction());
    
    DataStream<Speed> speeds = distances.map(new SpeedFunction())...
    

    You can chain process DataStreams.

    To access two messages you can create fixed size windows of 2 messages, then apply a processing function to the window.

    Assuming your stream is not keyed use windowAll().

    You have to implement both DistanceFunction (extends ProcessWindowFunction) and SpeedFunction (Extends MapFunction).