Search code examples
group-bypysparkbigdataamazon-emrprecision-recall

Pyspark Why does GroupBy (and GroupBy with count()) on results of GBMClassifier produces inconsistent result


In Pyspark I've got a large dataset loaded which I'm running through my GBMClassifier. Prior to train/fitting, performing a groupby on the input data produces expected results (the values add up to the expected count, etc.). However, after fitting the test data, using a GroupBy on the predictions does not give reproduceable results. I'm trying to produce a basic Precision/Recall, so I'm trying to split into groups of Label and Prediction. The results that are output don't vary a huge amount but do move around and aren't reliable. I haven't used MultiClassMetrics because I want to explore different classification probability thresholds, however at this point would be open to it. I haven't been able to get my output DataFrame into a format that MultiClassMetrics accepts though.

I've tried GroupBy with Count() as well as Filtering on the specific sets of data to see if using two different approaches might ellicit different results (i.e. if the data in the column wasn't being matched by the filter)

It's worth mentioning that I'm working on AWS in EMR Notebooks, on a 4 node cluster.

train_df=splits[0]
test_df=splits[1]

gbm = GBTClassifier(stepSize=0.1, seed=2018)

model_gbm = gbm.fit(train_df)
prediction_gbm = model_gbm.transform(test_df)

#Split the probability column into two values to allow assessment of different classification thresholds
prediction_gbm = (prediction_gbm.withColumn("probability_split",to_array(col("probability")))        .withColumn('prob_norm',col("probability_split")0]).withColumn('prob_fraud',col("probability_split")[1]))

#Test a new threshold

newPrediction = when(col('prob_fraud')>0.5,1).otherwise(0)
prediction_gbm = prediction_gbm.withColumn('newPrediction',newPrediction)

#This section simply prints the results of my grouping. This is what is producing inconsistent results
gbm_FN=prediction_gbm.filter((F.col('label')==1) & (F.col('newPrediction')==0)).count()
gbm_FP=prediction_gbm.filter((F.col('label')==0) & (F.col('newPrediction')==1)).count()
gbm_TP=prediction_gbm.filter((F.col('label')==1) & (F.col('newPrediction')==1)).count()
gbm_TN=prediction_gbm.filter((F.col('label')==0) & (F.col('newPrediction')==0)).count()

#Here is the groupBy code as well for clarification
prediction_gbm.groupBy(['label','prediction']).count().show()

I would expect the values output for the 4 grouping of label and prediction to add up consistently. Additionally, I'd expect the results of the groupby to be the same as the 4 values produced, and to add up to the same value.

EDIT: When I train my model, I get this error on the first pass, but after that when I run it, I don't see this issue:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 905

Solution

  • I spent some time playing around with this and it seems like the inconsistent results issue was being caused by a randomSplit() function that I hadn't included in my initial answer. The solution was to cache the dataframe before splitting. More info here: How does Spark keep track of the splits in randomSplit?

    As for the error, that is apparently a timeout from PySpark. A bit of poking around suggests this might be an out of memory error but it's difficult to debug in the EMR notebook. This doesn't occur when using spark-submit to the master node directly, which suggests that it maybe is memory based as you can push the memory up in spark-submit while it's not as easy in the notebook.