Search code examples
apache-sparkpysparkapache-spark-ml

How to create a Spark DataFrame inside a custom PySpark ML Pipeline _transform() method?


In Spark's ML Pipelines the transformer's transform() method takes a Spark DataFrame and returns a DataFrame. My custom _transform() method uses the DataFrame that's passed in to create an RDD before processing it. This means the results of my algorithm have to be converted back into a DataFrame before being returned from _transform().

So how should I create the DataFrame from the RDD inside _transform()?

Normally I would use SparkSession.createDataFrame(). But this means passing a SparkSession instance, spark, into my custom Transformer somehow (or a SqlContext object). And this in turn can create other problems such as when trying to use the transformer as a stage in an ML Pipeline.


Solution

  • It turns out it's as simple as doing this inside _transform():

    yourRdd.toDF(yourSchema)
    

    The schema is optional. I wish I could give you a link to toDF() but it doesn't seem to be included under https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD for some reason. Perhaps it's an inherited method?

    I also previously tested passing a SparkSession object into my Transformer and calling createDataFrame() on it. It works but it's unnecessary.