arrayspysparklinear-regressionuser-defined-functions

Perform Linear Regression on list from collect in Pyspark


I have a pyspark dataframe who contains in particular 2 columns on format Array[Double] previously extract using a F.collect. So, the 2 list are the same sizes for each lines of my dataframe and for each of these rows, I want to calculate the slope from a linear regression between the 2 values of these arrays. First array (named array_1 in my example) is my "X" and represents a normalized timestamp and the second one (array_2) is my "Y" and represents the values from my feature. At the end, I want to have a new column "Slope" who contains the slope of linear regression for each rows and if this is possible, the dataset in the same format (same numbers of rows and 1 more columns so with the slope).

I try many things but the last code I used is like that :

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

data = [(1, [1, 2, 3, 4], [8, 9, 7, 3]), (2, [5, 6, 7, 8], [1, 2, 3, 4]), (3, [9, 10, 11, 12], [1, 2, 3, 4])]
df = spark.createDataFrame(data, ["id", "array_1", "array_2"])

def calculate_slope(arr1, arr2):
    vector_assembler = VectorAssembler(inputCols=["arr1"], outputCol="features")
    lr = LinearRegression(featuresCol="features", labelCol="arr2")

    def calculate_slope_internal(row):
        temp_data = [(row.id, row.array1, row.array2)]
        temp_df = spark.createDataFrame(temp_data, ["id", "arr1", "arr2"])

        temp_df = vector_assembler.transform(temp_df)
        model = lr.fit(temp_df)

        return float(model.coefficients[1]) 

    return udf(calculate_slope_internal, DoubleType())


df = df.withColumn("slope", calculate_slope("array_1", "array_2")(df))

With that, I have this issue : Could not serialize object: TypeError: can't pickle _thread.RLock objects

My data are fakes in this example but I create them to have an idea of format of my real data.


Solution

  • I think in this case its best to use UDF function and pass each row to UDF to calculate the slope. For slope calculation you can simply use np.polyfit to fit the polynomial of degree 1 on the features and labels

    import numpy as np
    
    @F.udf(T.DoubleType())
    def slope(features, labels):
        m, a = np.polyfit(features, labels, 1)
        return float(m)
    
    df = df.withColumn('slope', slope('array_1', 'array_2'))
    

    +---+---------------+------------+------------------+
    | id|        array_1|     array_2|             slope|
    +---+---------------+------------+------------------+
    |  1|   [1, 2, 3, 4]|[8, 9, 7, 3]|-1.699999999999999|
    |  2|   [5, 6, 7, 8]|[1, 2, 3, 4]|1.0000000000000004|
    |  3|[9, 10, 11, 12]|[1, 2, 3, 4]|0.9999999999999977|
    +---+---------------+------------+------------------+