I would like to use a classifier in PySpark on a dataset that includes NULL values. The NULL values appear in features I have created, such as Success Percentage. I need to keep the NULL value, because I have shown via pandas that keeping the NULL values results in a stronger model. I therefore don’t want to impute NULLs with zeros or the median.
I understand that Vector Assembler can be used to create feature vectors, but it doesn’t work when the data contains NULL values. I was wondering if there is a way to create a features vector that DOES contain NULL values, which will work with the LightGBMClassifier?
I demonstrate the issue I am having with the diamonds.csv data. I use a clean unedited copy and a copy that I have inserted nulls into to demonstrate the issue that I have.
import pandas as pd
import numpy as np
import random
from mmlspark import LightGBMClassifier
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator
diamondsData = pd.read_csv("/dbfs/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").iloc[:,1:]
diamondsData_clean = diamondsData.copy()
diamondsData_clean = spark.createDataFrame(diamondsData_clean)
diamondsData['randnum'] = diamondsData.apply(lambda x: random.uniform(0, 1), axis=1)
diamondsData['depth'] = diamondsData[['depth','randnum']].apply(lambda x: np.nan if x['randnum'] < 0.05 else x['depth'], axis=1)
diamondsData_nulls = spark.createDataFrame(diamondsData)
diamondsData_nulls = diamondsData_nulls.select([when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c for c, t in diamondsData_nulls.dtypes])
diamondsData_nulls.show(10)
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+
|carat| cut|color|clarity|depth|table|price| x| y| z| randnum|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| 0.0755707311804259|
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| 0.9719186135587407|
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| 0.5237755344569698|
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 0.12103842271165433|
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| 0.48213792315234205|
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| 0.5461421401855059|
| 0.24|Very Good| I| VVS1| null| 57.0| 336|3.95|3.98|2.47|0.013923864248332252|
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| 0.551950501743583|
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| 0.09444899320350808|
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| 0.5246023480324566|
The stages for use in the Pipeline are then configured.
categoricalColumns = ['cut', 'color', 'clarity']
stages = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
The pipeline is fit to diamondsData_clean and the data is transformed, returning the label column and features vector as expected.
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_clean)
diamonds_final = pipelineModel.transform(diamondsData_clean)
selectedCols = ['price', 'features']
diamonds_final = diamonds_final.select(selectedCols)
diamonds_final.printSchema()
diamonds_final.show(6)
root
|-- price: long (nullable = true)
|-- features: vector (nullable = true)
+-----+--------------------+
|price| features|
+-----+--------------------+
| 326|(23,[0,5,12,17,18...|
| 326|(23,[1,5,10,17,18...|
| 327|(23,[3,5,13,17,18...|
| 334|(23,[1,9,11,17,18...|
| 335|(23,[3,12,17,18,1...|
| 336|(23,[2,14,17,18,1...|
+-----+--------------------+
However, when the same step is attempted on the diamondsData_nulls dataframe it returns an error.
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_nulls)
diamonds_final_nulls = pipelineModel.transform(diamondsData_nulls)
selectedCols = ['price', 'features']
diamonds_final_nulls = diamonds_final_nulls.select(selectedCols)
diamonds_final_nulls.printSchema()
diamonds_final_nulls.show(6)
root
|-- price: long (nullable = true)
|-- features: vector (nullable = true)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133952.0 failed 4 times, most recent failure: Lost task 0.3 in stage 133952.0 (TID 1847847, 10.139.64.4, executor 291): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<cutclassVec:vector,colorclassVec:vector,clarityclassVec:vector,carat:double,depth:double,table:double,x:double,y:double,z:double>) => vector)
This is a known issue that is being worked on (https://github.com/Azure/mmlspark/issues/304) but I cannot currently find a featurizer that allows NULLs to be passed through.
Trying to use handleInvalid = "keep" parameter
User Machiel suggested the handleInvalid parameter of the StringIndexer and OneHotEncoderEstimator functions - it turns out it should also be applied in the VectorAssembler function. I updated my code as such:
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index',handleInvalid = "keep")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"],handleInvalid = "keep")
stages += [stringIndexer, encoder]
numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid="keep")
stages += [assembler]
For strings and categorical numbers you can have Spark create a bucket for missing values using the handleInvalid parameter:
OneHotEncoderEstimator(inputCols=..., outputCols=..., handleInvalid='keep')
StringIndexer(inputCol=..., outputCol=..., handleInvalid='keep')