Search code examples
pythonapache-sparkpysparkapache-spark-ml

How to convert column types to match joining dataframes in pyspark?


I have an empty dataframe in pyspark that I want to use to append machine learning results coming from model.transform(test_data) in pyspark - but then I try a union function to join the dataframes I get a column types must match error.

This is my code:

sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

schema = StructType([
    StructField("row_num",IntegerType(),True),
    StructField("label",IntegerType(),True),
    StructField("probability",DoubleType(),True),
])
empty = spark.createDataFrame(sc.emptyRDD(), schema)

model = LogisticRegression().fit(train_data)

preds = model.transform(test_data)

all_preds = empty.unionAll(preds)

AnalysisException: Union can only be performed on tables with the compatible column types. 
struct<type:tinyint,size:int,indices:array<int>,values:array<double>> <> double at the third column of the second table;

I've tried casting the types of my empty dataframe to match but it hasn't worked to get the same types - is there any way around this? I'm aiming to have the machine learning run iteratively in a for loop with each prediction output appended to a pyspark dataframe.

For reference, preds looks like:

preds.printSchema()
root
 |-- row_num: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- probability: vector (nullable = true)

Solution

  • You can create an empty dataframe based on the schema of the preds dataframe:

    model = LogisticRegression().fit(train_data)
    preds = model.transform(test_data)
    empty = spark.createDataFrame(sc.emptyRDD(), preds.schema)
    all_preds = empty.unionAll(preds)