Search code examples
pythonpysparkuser-defined-functionsfloorceil

how to use a custom udf implementation to round the column


I have a pyspark dataframe like :

+-------------------+
|      to_return_day|
+-------------------+
|          -2.003125|
| -20.96738425925926|
| -2.332546296296296|
| -2.206770833333333|
|-2.9733564814814817|
| 54.71157407407407|
| 51.70229166666667|
|48.666354166666665|
| 9.665497685185185|
| 49.56260416666667|
| 66.68983796296297|
| 49.80550925925926|
|  66.6899074074074|

and I want to use a udf to implement rounding up when "to_return_day">0 and down rounding when "to_return_day"<0.

my code :

from pyspark.sql.functions import udf
@udf("double")
def floor_ceil(col_day):
   if col_day > 0:
      return ceil(col_day)
   else :
       return floor(col_day)
 spark.udf.register("floor_ceil", floor_ceil)
patron_lending_time.withColumn("to_return_day_round",ceil(col("to_return_day")))\
               .show()

and my get

enter image description here

Why It happens? How can I fix It?


Solution

  • I might not have completely understood the Q OP has posted. According to my understanding the output OP wants is this -

    1) For positive values (greater than equal to 0 I take), the closest integral value above that number, for eg; for 2.34, it will be 3.

    2) For negative values, the closest integral value below that number, for eg; for -2.34, it will be -3.

    # Creating the DataFrame
    values = [(-2.003125,),(-20.96738425925926,),(-2.332546296296296,),(-2.206770833333333,),
              (-2.9733564814814817,),(54.71157407407407,),(51.70229166666667,),(48.666354166666665,),
              (9.665497685185185,),(49.56260416666667,),(66.68983796296297,),(49.80550925925926,),
              (66.6899074074074,),]
    df = sqlContext.createDataFrame(values,['to_return_day',])
    df.show()
    +-------------------+
    |      to_return_day|
    +-------------------+
    |          -2.003125|
    | -20.96738425925926|
    | -2.332546296296296|
    | -2.206770833333333|
    |-2.9733564814814817|
    |  54.71157407407407|
    |  51.70229166666667|
    | 48.666354166666665|
    |  9.665497685185185|
    |  49.56260416666667|
    |  66.68983796296297|
    |  49.80550925925926|
    |   66.6899074074074|
    +-------------------+
    

    There is no need to create the UDF, when using simple if-else statement suffices.

    # Importing relevant functions
    from pyspark.sql.functions import ceil, floor, when
    df = df.withColumn('to_return_day',when(col('to_return_day') >=0 , ceil(col('to_return_day'))).otherwise(floor(col('to_return_day'))))
    df.show()
    +-------------+
    |to_return_day|
    +-------------+
    |           -3|
    |          -21|
    |           -3|
    |           -3|
    |           -3|
    |           55|
    |           52|
    |           49|
    |           10|
    |           50|
    |           67|
    |           50|
    |           67|
    +-------------+
    

    Documentation: ceil and floor

    In case you only wish to use UDF, then following code will work.

    # Import relevant functions and packages.
    from pyspark.sql.functions import udf, col
    import math
    # Defining a UDF
    def round_udf(c):
        if c < 0:
            return math.floor(c)
        else:
            return math.ceil(c)
    
    round_udf = udf(round_udf,IntegerType())
    
    df = df.withColumn('to_return_day',round_udf(col('to_return_day')))