Search code examples
apache-sparkpysparkspark-streamingapache-spark-mllib

Spark Streaming: How to load a Pipeline on a Stream?


I am implementing a lambda architecture system for stream processing.

I have no issue creating a Pipeline with GridSearch in Spark Batch:

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])

paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()

cv = CrossValidator(estimator=pipeline,
                estimatorParamMaps=paramGrid,
                evaluator=BinaryClassificationEvaluator(),
                numFolds=4)

pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")

However, I cant seem to find how to plug the pipeline in the Spark Streaming Process. I am using kafka as the DStream source and my code as of now is as follows:

import json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils

from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc,  "localhost:2181", "spark-    streaming-consumer", {"kafka_topic": 1})

model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))

CODE MISSING GOES HERE    

ssc.start()
ssc.awaitTermination()

and now I need to find someway of doing

Based on the documentation here (even though it looks very very outdated) it seems like your model needs to implement the method predict to be able to use it on an rdd object (and hopefully on a kafkastream?)

How could I use the pipeline on the Streaming context? The reloaded PipelineModel only seems to implement transform

Does that mean the only way to use batch models in a Streaming context is to use pure models ,and no pipelines?


Solution

  • I found a way to load a Spark Pipeline into spark streaming.

    This solution works for Spark v2.0 , as further versions will probably implement a better solution.

    The solution I found transforms the streaming RDDs into Dataframes using the toDF() method, in which you can then apply the pipeline.transform method.

    This way of doing things is horribly inefficient though.

    # we load the required libraries
    from pyspark.sql.types import (
            StructType, StringType, StructField, LongType
            )
    from pyspark.sql import Row
    from pyspark.streaming.kafka import KafkaUtils
    
    #we specify the dataframes schema, so spark does not have to do reflections on the data.
    
    pipeline_schema = StructType(
        [
            StructField("field1",StringType(),True),
            StructField("field2",StringType(),True),
            StructField("field3", LongType(),True)
     ]
    )
    
    #We load the pipeline saved with spark batch
    pipeline = PipelineModel.load('/pipeline')
    
    #Setup usual spark context, and spark Streaming Context
    sc = spark.sparkContext
    ssc = StreamingContext(sc, 1)
    
    #On my case I use kafka directKafkaStream as the DStream source
    directKafkaStream = KafkaUtils.createDirectStream(ssc, suwanpos[QUEUE_NAME], {"metadata.broker.list": "localhost:9092"})
    
    def handler(req_rdd):
        def process_point(p):
            #here goes the logic to  do after applying the pipeline
            print(p)   
        if req_rdd.count()  > 0:
            #Here is the gist of it, we turn the rdd into a Row, then into a df with the specified schema)
            req_df = req_rdd.map(lambda r: Row(**r)).toDF(schema=pipeline_schema)
            #Now we can apply the transform, yaaay
            pred = pipeline.transform(req_df)
            records = pred.rdd.map(lambda p: process_point(p)).collect()
    

    Hope this helps.