Search code examples
pythonapache-sparkpysparktypesrdd

Casting RDD to a different type (from float64 to double)


I have a code like below, which uses pyspark.

test_truth_value = RDD.

test_predictor_rdd = RDD.

valuesAndPred = test_truth_value.zip(lasso_model.predict(test_predictor_rdd)).map(lambda x: ((x[0]), (x[1])))
metrics = RegressionMetrics(valuesAndPred)

When i run the code, I get the following error

pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `DoubleType()` can not accept object `-44604.288415296396` in type `float64`.

This happens with the below portion.

metrics = RegressionMetrics(valuesAndPred)

In general, I would fix the type of RDD by following something like the below link answer. Pyspark map from RDD of strings to RDD of list of doubles

However...I have three questions now.

  1. What is the difference between float64 and double? Swift Difference Between Double and Float64 From this link, it seems like the pyspark is differenciating float64 and double?
  2. When I created the previous RDDs, I already casted them into double like below.
double_cast_list = ['price','bed','bath','acre_lot','house_size']
for cast_item in double_cast_list:
    top_zip_df = top_zip_df.withColumn(cast_item, col(cast_item).cast(DoubleType()))

lasso_df = top_zip_df.select('price','bed','bath','acre_lot','house_size')
train_df, test_df = lasso_df.randomSplit(weights = [0.7,0.3], seed = 100)

def scaled_rdd_generation(df):
    rdd = df.rdd.map(lambda row: LabeledPoint(row[0], row[1::]))
    
    # separate the features and the lables from rdd - only need to standardize the features. 
    features_rdd = rdd.map(lambda row: row.features) # this is possible, because the LabeledPoint class has label and feature columns already built in
    scaler = StandardScaler(withMean = True, withStd = True)
    # for the standard scaler, you need to fit the scaler and then transforme the df. 
    # scaler.fit(rdd) -> computes the mean and variance and stores as a model to be used later
    scaler_model =  scaler.fit(features_rdd)
    scaled_feature_rdd = scaler_model.transform(features_rdd)
    # rdd zip method: zips RDD with another one. returns key-value pair. 
    scaled_rdd = rdd.zip(scaled_feature_rdd).map(lambda x: LabeledPoint(x[0].label, x[1]))
    return scaled_rdd


model_save_path = r'C:\Users\ra064640\OneDrive - Honda\Desktop\Spark\Real Estate Linear Regression'
train_scaled_rdd = scaled_rdd_generation(train_df)
test_scaled_rdd = scaled_rdd_generation(test_df)
test_predictor_rdd = test_scaled_rdd.map(lambda x: x.features)
test_truth_value = test_scaled_rdd.map(lambda x: x.label)

Where in there is it transforming the double to float64?

  1. How should I fix this? I do not see a function similar to double(x[0]) as suggested by float(x[0]) in the previous link. Thanks!

Solution

  • First, as mentioned in Spark docs - here's the difference between float and double type:

    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.

    Second, as you mentioned the error comes here:

    valuesAndPred = test_truth_value.zip(lasso_model.predict(test_predictor_rdd)).map(lambda x: ((x[0]), (x[1])))
    metrics = RegressionMetrics(valuesAndPred)
    

    More specifically, the issue may have arisen because of this part: lasso_modle.predict(test_predictor_rdd).

    Finally, to fix this you may try casting the predictions as well as lasso_model.predict(test_predictor_rdd).map(float).

    Modified code:

    valuesAndPred = test_truth_value.zip(lasso_model.predict(test_predictor_rdd).map(float)).map(lambda x: ((x[0]), (x[1])))
    metrics = RegressionMetrics(valuesAndPred)