Search code examples
pythonpysparksignal-processingfeature-engineering

Most efficient way to create features from the leading DCT coefficients of aggregated data - PySpark


I am currently creating features for a dataset that consists of time series data for various sensor readings of some pieces of equipment, which are probably related to fault events of the same. The basic structure of this data is that we have a kind of table which combines the equipment ID, timestamps, and sensor readings.

| ID | Cycle_ID | Timestamp  | sensor_1 | sensor_2 |
|----|----------|------------|----------|----------|
| 1  | 1        | 1547142555 | 123      | 641      |
| 1  | 1        | 1547142556 | 123      | 644      |
| 1  | 2        | 1547142557 | 124      | 643      |

The idea is now to aggregate the data based on the cycles to create a sequence (and corresponding features) corresponding to those. The amount of raw data is massive and requires utilization of spark, but the resulting dataset after aggregations is small enough to be used in order to store it in a Pandas DF and build a model with keras. Among other things, one idea is to collect the leading DCT components for some sensors in order to use them as a feature. In order to do this, we do (among others) the following aggregation:


from pyspark.sql import Row, window
import pyspark.sql.functions as func

W = window.Window.partitionBy('ID', 'Cycle_ID').orderBy('Timestamp')

df_collect = pfr_flight_match.withColumn('sensor_1_coll', 
                 func.collect_list('sensor_1').over(W)) \
                 .groupBy('ID', 'Cycle_ID') \ 
                 .agg(func.max("sensor_1_coll").alias('sensor_1_coll'))

This gives me, for each cycle of each equipment separately, the sensor time series as an array. The idea would be now to perform DCT on it, keep only the leading n coefficients, and add these separately as new feature columns. I have come up with a way to do this, however, the performance seems to be terrible, which is why I am seeking help.

Since unfortunately it is not possible to use the DCT of Pyspark on an array (as per documentation the feature has to be of DenseVector type), we need to convert the collected array to a DenseVector. It seems to me that there is no efficient way, so I am using an UDF to do this:

import pyspark.ml
to_vec = func.udf(lambda x: pyspark.ml.linalg.DenseVector(x),
                  pyspark.ml.linalg.VectorUDT())

The next step is to perform the DCT itself, using something like this:

# Determine which column is the target of DCT
col_to_transform = 'sensor_1_coll'
df = df_collect.withColumn('vec', to_vec(col_to_transform))

# After switching the column type to DenseVector, we can apply DCT
dct = pyspark.ml.feature.DCT(inverse=False, inputCol='vec', outputCol='vec_dct')
df_dct = dct.transform(df)

# Drop intermediate columns
df_dct = df_dct.drop('vec', col_to_transform)

Now comes the point where I fear the pitfall to be: We need to truncate the DCT vector to some amount of coefficients, which are then to be exploded into separate columns for passing them into a Pandas DF/Numpy array later on.

I fear that using a UDF is not good performance-wise; and anyway a DenseVector is not represented as an array type. So this here doesn't work:

import pyspark.ml
trunc_vec = func.udf(lambda x: x[0:n],
                  pyspark.ml.linalg.VectorUDT())

So what I lastly did was to map a suitable function onto the RDD version of the above DF, and return it as dataframe. This is what I am using right now:

# State columns used for grouping
idx = ['ID', 'Cycle_ID']
keep_coeffs = 30 # How many of the leading coefficients shall be kept?

from functools import partial

# To be mapped onto rdd: Return auxillary columns plus the DCT coeffs as 
# individual columns, which are named serially
 def truncate_dct_vec(vec, coeffs):
    return tuple(vec[i] for i in idx) + tuple(vec.vec_dct.toArray()[0:coeffs+1].tolist())
truncate_dct_vec = partial(truncate_dct_vec, coeffs=keep_coeffs)

# Perform the mapping to get the truncated DCT coefficients, each in an individual column
df_dct = df_dct.rdd.map(truncate_dct_vec).toDF(idx)

The problem is that this seems to be extremely slow to run (probably due to the serialization and conversion between the JVM and python happening doing all these steps), which is almost prohibitive. I am mainly looking for faster alternatives. Any help with this is appreciated.


Solution

  • This is an old thread, but, I hope this helps others in the future. VectorAssembler will encode one or more columns into a dense vector representation. If you need a sparse representation, take a look at FeatureHasher. It also has support for categorical and boolean values too.

    At any rate, this should do the trick:

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, IntegerType
    from pyspark.ml.feature import VectorAssembler, DCT
    
    rows = [Row(id=1, cycle_id=1, sensor_1=123, sensor_2=641), 
            Row(id=1, cycle_id=1, sensor_1=123, sensor_2=644), 
            Row(id=1, cycle_id=2, sensor_1=124, sensor_2=643)]
    
    data_schema = StructType([StructField("id", IntegerType(), True), 
             StructField("cycle_id", IntegerType(), True), 
             StructField("sensor_1", IntegerType(), True),
             StructField("sensor_2", IntegerType(), True)])
    
    df = spark.createDataFrame(rows, data_schema)
    
    cols = ["id", "cycle_id", "sensor_1", "sensor_2"]       
    assembler = VectorAssembler(inputCols=cols, outputCol="features")
    df = assembler.transform(df)
    
    dct = DCT(inverse=False, inputCol="features", outputCol="features_dct")
    dct_df = dct.transform(df)
    dct_df.select("features_dct").show(truncate=False)
    

    The following will inverse the DCT into the original signal:

    dct_inv = DCT(inverse=True, inputCol="features_dct", outputCol="features_dct_inverse")
    dct_df_inv = dct_inv.transform(dct_df)
    dct_df_inv.select("features_dct_inverse").show(truncate=False)