I have a stream of object coordinates (time, x, y), I want to transform it into a stream of distances and then to a stream of speeds. To do it I need to process two consecutive messages each time.
Can you give me an idea on how to create a stream of Euclidean distances between points?
DataStream<Distance> distances = yourCoordinateSource
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(2)))
.process(new DistanceFunction());
DataStream<Speed> speeds = distances.map(new SpeedFunction())...
You can chain process DataStream
s.
To access two messages you can create fixed size windows of 2 messages, then apply a processing function to the window.
Assuming your stream is not keyed use windowAll()
.
You have to implement both DistanceFunction
(extends ProcessWindowFunction
) and SpeedFunction
(Extends MapFunction
).