Search code examples
javaapache-sparkspark-streamingapache-spark-ml

Spark ml streaming predictOnValues how to save results?


I have following code:

StreamingLinearRegressionWithSGD regressionWithSGD =
        new StreamingLinearRegressionWithSGD()
                .setInitialWeights(Vectors.zeros(featuresNumber));

JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache();
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse);
regressionWithSGD.trainOn(trainingData);
regressionWithSGD.predictOnValues(testData.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()))).print();

I would like to put results to some file/db/queue and so on instead of print() is it possible?


Solution

  • I have figured it out

    StreamingLinearRegressionWithSGD regressionWithSGD =
                    new StreamingLinearRegressionWithSGD()
                            .setInitialWeights(Vectors.zeros(featuresNumber));
    
            JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache();
            JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse);
            regressionWithSGD.trainOn(trainingData);
            JavaDStream<Double> doubleJavaDStream=regressionWithSGD.predictOn(testData.map(labeledPoint -> labeledPoint.features()));
            doubleJavaDStream.dstream().saveAsTextFiles("result","out");
    

    So as a result we are getting result-{timestamp}.out folders.