Search code examples
datasetapache-flinkdata-streamflinkml

Apache Flink - svm predictions on streaming data


I am using Apache Flink to predict streams from Twitter.

Code is implemented in Scala

My Problem is, that my trained SVM-Model from the DataSet API needs a DataSet as an input for the predict()-Method.

I saw already a Question here, where a user said, that you need to write a own MapFunction which reads the model upon start of the job (ref: Real-Time streaming prediction in Flink using scala)

But i am not able to write/understand this code.

Even if i get the model inside the StreamingMapFunction. I still need a DataSet as a Parameter to predict the result.

I really hope someone can show/explain me how this is done.

Flink-Version: 1.9 Scala-Version: 2.11 Flink-ML:2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
        featureVectorService.learnTrainingData(labeledData, false)

//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
        val svm = SVM()
                .setBlocks(env.getParallelism)
                .setIterations(100)
                .setRegularization(0.001)
                .setStepsize(0.1)
                .setSeed(42)
//learning
svm.fit(trainingData)

//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))

//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
                .flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)


Solution

  • So, currently the FlinkML, which SVM is part of, does not support the streaming API. That is why SVM accepts only DataSet. The idea is not to use the FlinkML, but rather some SVM library available in scala or java. Then you could read the model, for example from file. The issue is that You have to implement most of the logic by Yourself.

    The comment in the post You have mentioned is more or less saying the exact same thing.