Search code examples
pythonpysparknullvectorizationlightgbm

PySpark feature vector to allow NULL values


I would like to use a classifier in PySpark on a dataset that includes NULL values. The NULL values appear in features I have created, such as Success Percentage. I need to keep the NULL value, because I have shown via pandas that keeping the NULL values results in a stronger model. I therefore don’t want to impute NULLs with zeros or the median.

I understand that Vector Assembler can be used to create feature vectors, but it doesn’t work when the data contains NULL values. I was wondering if there is a way to create a features vector that DOES contain NULL values, which will work with the LightGBMClassifier?

I demonstrate the issue I am having with the diamonds.csv data. I use a clean unedited copy and a copy that I have inserted nulls into to demonstrate the issue that I have.

import pandas as pd
import numpy as np
import random

from mmlspark import LightGBMClassifier
from pyspark.sql.functions import * 
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator

diamondsData = pd.read_csv("/dbfs/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").iloc[:,1:] 
diamondsData_clean = diamondsData.copy()
diamondsData_clean = spark.createDataFrame(diamondsData_clean)

diamondsData['randnum'] = diamondsData.apply(lambda x: random.uniform(0, 1), axis=1)
diamondsData['depth'] = diamondsData[['depth','randnum']].apply(lambda x: np.nan if x['randnum'] < 0.05 else x['depth'], axis=1)
diamondsData_nulls = spark.createDataFrame(diamondsData)
diamondsData_nulls = diamondsData_nulls.select([when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c for c, t in diamondsData_nulls.dtypes])
diamondsData_nulls.show(10)
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+ 
|carat| cut|color|clarity|depth|table|price| x| y| z| randnum|
 +-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+ 
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| 0.0755707311804259| 
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| 0.9719186135587407| 
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| 0.5237755344569698| 
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 0.12103842271165433| 
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| 0.48213792315234205| 
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| 0.5461421401855059| 
| 0.24|Very Good| I| VVS1| null| 57.0| 336|3.95|3.98|2.47|0.013923864248332252| 
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| 0.551950501743583| 
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| 0.09444899320350808| 
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| 0.5246023480324566|

The stages for use in the Pipeline are then configured.

categoricalColumns = ['cut', 'color', 'clarity']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

The pipeline is fit to diamondsData_clean and the data is transformed, returning the label column and features vector as expected.

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_clean)
diamonds_final = pipelineModel.transform(diamondsData_clean)
selectedCols = ['price', 'features']
diamonds_final = diamonds_final.select(selectedCols)
diamonds_final.printSchema()
diamonds_final.show(6)
root 
|-- price: long (nullable = true) 
|-- features: vector (nullable = true) 
+-----+--------------------+ 
|price| features| 
+-----+--------------------+ 
| 326|(23,[0,5,12,17,18...| 
| 326|(23,[1,5,10,17,18...| 
| 327|(23,[3,5,13,17,18...| 
| 334|(23,[1,9,11,17,18...| 
| 335|(23,[3,12,17,18,1...| 
| 336|(23,[2,14,17,18,1...| 
+-----+--------------------+

However, when the same step is attempted on the diamondsData_nulls dataframe it returns an error.

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_nulls)
diamonds_final_nulls = pipelineModel.transform(diamondsData_nulls)
selectedCols = ['price', 'features']
diamonds_final_nulls = diamonds_final_nulls.select(selectedCols)
diamonds_final_nulls.printSchema()
diamonds_final_nulls.show(6)
root 
|-- price: long (nullable = true) 
|-- features: vector (nullable = true) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133952.0 failed 4 times, most recent failure: Lost task 0.3 in stage 133952.0 (TID 1847847, 10.139.64.4, executor 291): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct&lt;cutclassVec:vector,colorclassVec:vector,clarityclassVec:vector,carat:double,depth:double,table:double,x:double,y:double,z:double&gt;) =&gt; vector)

This is a known issue that is being worked on (https://github.com/Azure/mmlspark/issues/304) but I cannot currently find a featurizer that allows NULLs to be passed through.

Trying to use handleInvalid = "keep" parameter

User Machiel suggested the handleInvalid parameter of the StringIndexer and OneHotEncoderEstimator functions - it turns out it should also be applied in the VectorAssembler function. I updated my code as such:

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index',handleInvalid = "keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"],handleInvalid = "keep")
    stages += [stringIndexer, encoder]

numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid="keep")
stages += [assembler]

Solution

  • For strings and categorical numbers you can have Spark create a bucket for missing values using the handleInvalid parameter:

    OneHotEncoderEstimator(inputCols=..., outputCols=..., handleInvalid='keep')
    StringIndexer(inputCol=..., outputCol=..., handleInvalid='keep')