How do I handle out-of-order events with Apache flink?

To test out stream processing and Flink, I have given myself a seemingly simple problem. My Data stream consists of x and y coordinates for a particle along with time t at which the position was recorded. My objective is to annotate this data with velocity of the particular particle. So the stream might look some thing like this.

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

Now there is no guaranty that the events would arrive in order i.e. 1612103771213 p2 -0.1 -0.1 might arrive say 10ms before 1612103771212 p2 0.0 0.0.

For simplicity, it can be assumed that any late data will arrive within 100ms of the early data.

I will admit that I am new to stream processing and Flink, so this might be a stupid question to ask with an obvious answer, but I am currently stumped as to how to go about achieving my objective here.


Following David's answer I tried using Flink Table API to sort the Datastream, using nc -lk 9999 for text socket stream. The issue is that nothing gets printed to the console until I close the text socket stream. Here is the scala code I wrote -

package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration

object AnnotateJob {

  val OUT_OF_ORDER_NESS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env, bSettings)


    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost", 9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posStream = objStream
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t

    val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
    tableEnv.createTemporaryView("pos", tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

    val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosStream.keyBy(pos => ValAnnotator)


    // execute program

  case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
  case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
                            var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String, ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\\W+")
      ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)



  • In general, watermarks in combination with event-time timers are the solution to the problems posed by out-of-order event streams. The section of the official Flink training that covers Event Time and Watermarks explains how this works.

    At a higher level it is sometimes easier to use something like Flink's CEP library, or Flink SQL, because they make it very easy to sort a stream by time, thus removing all of the out-of-orderness. For example, see How to sort a stream by event time using Flink SQL for an example of a Flink DataStream program that uses Flink SQL to sort a stream by event time.

    In your case, a fairly simple MATCH_RECOGNIZE query would do what you're looking for. That might look something like this,

    SELECT *
        FROM event
            PARTITION BY particleId
            ORDER BY ts
                velocity(a, b)
            PATTERN (a b)
                a AS TRUE,
                b AS TRUE

    where velocity(a, b) is a user-defined function that computes the velocity, given two sequential events (a and b) for the same particle.