Search code examples
apache-flinkflink-streaming

Flink KeyedCoProcessFunction working with state


I use KeyedCoProcessFunction function to enrich main datastream with data comes from another stream

Code:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
        out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
      case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
      case _ => logger.debug("Smth went wrong")
    }
  }

  override def processElement2(
                                value: AssetCommandState,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    value.command match {
      case CREATE =>
        logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
        logger.debug(s"current state is ${associatedDevices.value()}")
        associatedDevices.update(AssetStateDoc(Some(value.assetId)))
        logger.debug(s"new state is ${associatedDevices.value()}")
      case _ =>
        logger.error("Got unknown AssetCommandState command")
    }
  }
}

processElement2() works good, it's accept data and update a state.
but in a processElement1() I am always hitting case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")

although I expect that there will be a value that was set in processElement2 function

as an example I used this guide - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/


Solution

  • processElement1 and processElement2 do share state, but keep in mind that this is key-partitioned state. This means that a value set in processElement2 when processing a given value v2 will only be seen in processElement1 when it is called later with a value v1 having the same key as v2.

    Also keep in mind that you have no control over the race condition between the two streams coming into processElement1 and processElement2.

    The RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/ is the home for the corresponding tutorial.