Search code examples
scalamachine-learningapache-flinkflink-streaming

Dynamically create flatmap function (keyed states) with values in a stream


I am writing a streaming Flink program to do feature extractions for our offline trained model and was wondering about the design of the program. I want each feature extraction logic to maintain its own state within its class so that adding a new feature extraction would be equivalent to adding a new class.

The rough high-level design is as followed:

#data is the stream of relative paths to the feature extraction logic in our code e.g. com.xxx.FeatureExtraction1
val data:DataStream[String] = ...

#based on the relative path, use reflection to initiate the class
featureExtraction1 = method.getReflect("com.xxx.FeatureExtraction1")
data.keyBy(_).flatmap(featureExtraction1)

where each feature extraction logic has its own internal state tracking

class FeatureExtraction1 extends RichFlatMapFunction[String, Double)] {

private var mystate: MapState = _

override def flatMap(input: String, out: Collector[Double]) = {
// access the state value
}

override def open(parameters: Configuration): Unit = {
   mystate = xxx
}

}

I could make it work like this, as soon as I add a new feature extraction class, e.g. com.xxx.FeatureExtraction2, I append that to the data stream like

data.keyBy(_).flatmap(featureExtraction1).flatmap(featureExtraction2)...flatmap(featureExtractionN)

However, I don't know Flink well enough to be sure that if the featureExtraction1 though featureExtractionN will be executed concurrently (they should in my head) if they are chained like this. Secondly, I want to write the code that it automatically creates a new feature extraction logics without me appending it to the stream. In my head, it might look like this:

data.keyBy(_).foreachValueIntheStream.flatmap(new FeatureExtractionX based on the Value)

if I can do this, adding a new feature would be adding a new feature extraction class with its own state tracking

Please advise my naive thinking. I am grateful for any guidance.


Solution

  • Flink can't dynamically add functions. But you could do something close, I think.

    I'd use a broadcast stream for the feature paths, and a regular stream for the actual data to be processed. Connect them to create a connected stream, then run that into a CoFlatMapFunction. Inside this function you'd maintain a list of (dynamically generated) feature extraction functions that you apply to the incoming data. For state, use a Map<feature extraction function id, value>, so that each feature extraction function records its state in the same map.

    You do have the typical issue of wanting to empty the broadcast stream before processing the first of the data elements - see the mailing list for discussions on how to do that.