Search code examples
scalaapache-flinkflink-streamingstream-processing

How do I handle out-of-order events with Apache flink?


To test out stream processing and Flink, I have given myself a seemingly simple problem. My Data stream consists of x and y coordinates for a particle along with time t at which the position was recorded. My objective is to annotate this data with velocity of the particular particle. So the stream might look some thing like this.

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

Now there is no guaranty that the events would arrive in order i.e. 1612103771213 p2 -0.1 -0.1 might arrive say 10ms before 1612103771212 p2 0.0 0.0.

For simplicity, it can be assumed that any late data will arrive within 100ms of the early data.

I will admit that I am new to stream processing and Flink, so this might be a stupid question to ask with an obvious answer, but I am currently stumped as to how to go about achieving my objective here.

EDIT

Following David's answer I tried using Flink Table API to sort the Datastream, using nc -lk 9999 for text socket stream. The issue is that nothing gets printed to the console until I close the text socket stream. Here is the scala code I wrote -


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

  val OUT_OF_ORDER_NESS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env, bSettings)

    env.setParallelism(1)

    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost", 9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posStream = objStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
          })
      )

    val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
    tableEnv.createTemporaryView("pos", tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

    val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)

    sortedPosStream.print()

    // execute program
    env.execute()
  }

  case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
  case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
                            var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String, ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\\W+")
      ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
    }
  }

}


Solution

  • In general, watermarks in combination with event-time timers are the solution to the problems posed by out-of-order event streams. The section of the official Flink training that covers Event Time and Watermarks explains how this works.

    At a higher level it is sometimes easier to use something like Flink's CEP library, or Flink SQL, because they make it very easy to sort a stream by time, thus removing all of the out-of-orderness. For example, see How to sort a stream by event time using Flink SQL for an example of a Flink DataStream program that uses Flink SQL to sort a stream by event time.

    In your case, a fairly simple MATCH_RECOGNIZE query would do what you're looking for. That might look something like this,

    SELECT *
        FROM event
        MATCH_RECOGNIZE (
            PARTITION BY particleId
            ORDER BY ts
            MEASURES 
                b.ts, 
                b.particleId, 
                velocity(a, b)
            AFTER MATCH SKIP TO NEXT ROW
            PATTERN (a b)
            DEFINE
                a AS TRUE,
                b AS TRUE
        )
    

    where velocity(a, b) is a user-defined function that computes the velocity, given two sequential events (a and b) for the same particle.