arrayspysparklinear-regressionuser-defined-functions# Perform Linear Regression on list from collect in Pyspark

I have a pyspark dataframe who contains in particular 2 columns on format Array[Double] previously extract using a F.collect. So, the 2 list are the same sizes for each lines of my dataframe and for each of these rows, I want to calculate the slope from a linear regression between the 2 values of these arrays. First array (named array_1 in my example) is my "X" and represents a normalized timestamp and the second one (array_2) is my "Y" and represents the values from my feature. At the end, I want to have a new column "Slope" who contains the slope of linear regression for each rows and if this is possible, the dataset in the same format (same numbers of rows and 1 more columns so with the slope).

I try many things but the last code I used is like that :

```
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
data = [(1, [1, 2, 3, 4], [8, 9, 7, 3]), (2, [5, 6, 7, 8], [1, 2, 3, 4]), (3, [9, 10, 11, 12], [1, 2, 3, 4])]
df = spark.createDataFrame(data, ["id", "array_1", "array_2"])
def calculate_slope(arr1, arr2):
vector_assembler = VectorAssembler(inputCols=["arr1"], outputCol="features")
lr = LinearRegression(featuresCol="features", labelCol="arr2")
def calculate_slope_internal(row):
temp_data = [(row.id, row.array1, row.array2)]
temp_df = spark.createDataFrame(temp_data, ["id", "arr1", "arr2"])
temp_df = vector_assembler.transform(temp_df)
model = lr.fit(temp_df)
return float(model.coefficients[1])
return udf(calculate_slope_internal, DoubleType())
df = df.withColumn("slope", calculate_slope("array_1", "array_2")(df))
```

With that, I have this issue : Could not serialize object: TypeError: can't pickle _thread.RLock objects

My data are fakes in this example but I create them to have an idea of format of my real data.

Solution

I think in this case its best to use UDF function and pass each row to UDF to calculate the slope. For slope calculation you can simply use `np.polyfit`

to fit the polynomial of degree 1 on the features and labels

```
import numpy as np
@F.udf(T.DoubleType())
def slope(features, labels):
m, a = np.polyfit(features, labels, 1)
return float(m)
df = df.withColumn('slope', slope('array_1', 'array_2'))
```

```
+---+---------------+------------+------------------+
| id| array_1| array_2| slope|
+---+---------------+------------+------------------+
| 1| [1, 2, 3, 4]|[8, 9, 7, 3]|-1.699999999999999|
| 2| [5, 6, 7, 8]|[1, 2, 3, 4]|1.0000000000000004|
| 3|[9, 10, 11, 12]|[1, 2, 3, 4]|0.9999999999999977|
+---+---------------+------------+------------------+
```

