I’m preparing data for input for a classifier in Pyspark. I have been using aggregate functions in SparkSQL to extract features such as average and variance. These are grouped by activity, name and window. Window has been calculated by dividing a unix timestamp by 10000 to break into 10 second time windows.
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
The result of this would look like
Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
What I want to do now, is to calculate the average slope of each point in X.
To this I need timestamp, window, and X. I have implemented the logic in Python, using arrays, this is what it would look like – calculating the slope between each point, and then getting the average slope. Ideally, I would like to do this in a UDAF , which is not yet supported in Pyspark. (It would look like this , say if function below was called slope. Then in sql you could do slope(timestamp, X) as avgSlopeX
EDIT - changed input so it is clearer. So, what I am doing exactly is computing the slope between each point, and then returning the average of the slopes in that window. So, as I am getting the average and variance of each window, I also want to get the average slope.
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
How can I implement this? Should I try converting to a pandas dataframe then a numpy array? If so, how can I make sure data will still be mapped properly, keeping in mind the GROUP BY activity, name window in the sql query.
In general this is not a job for UDAF because UDAFs don't provide any means to define the order. It looks like what you really need here is some combination of window functions and standard aggregations.
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
df = ...
## DataFrame[activity: string, name: string, window: bigint,
## timestamp: bigint, value: float]
group = ["activity", "name", "window"]
w = (Window()
.partitionBy(*group)
.orderBy("timestamp"))
v_diff = col("value") - lag("value", 1).over(w)
t_diff = col("timestamp") - lag("timestamp", 1).over(w)
slope = v_diff / t_diff
df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope")))