Search code examples
pythondataframepysparkuser-defined-functions

pyspark udf function storing incorrect data despite function producing correct result


So I have this weird issue. I'm using a huge dataset that has dates and times in it represented by a single string. This data can be easily converted using datetime.strptime(), but the problem is the data is so huge, I need to use pyspark to convert it. No problem, I thought, I scoured stackoverflow and saw UDFs! so I made one. unfortunately, the values stored in the dataframe don't match what the function actually produces. I assumed a function would be executed row by row as spark sees the data, but it doesn't seem like that's happening.

Here's what I have (data is a pyspark dataframe called result and I'm showing the first 5 rows):

timestamp node_id subsys sensor par val_raw val_hrf
2018/01/01 00:00:06 001e0610e532 chemsense lps25h temp -954 -9.54
2018/01/01 00:00:30 001e0610e532 chemsense lps25h temp -954 -9.54
2018/01/01 00:00:54 001e0610e532 chemsense lps25h temp -957 -9.57
2018/01/01 00:01:18 001e0610e532 chemsense lps25h temp -961 -9.61
2018/01/01 00:01:42 001e0610e532 chemsense lps25h temp -962 -9.62

To convert the timestamp column into actionable data, I convert it to a float using a custom function:

def timeswap(x:str):
    print(x)
    utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
    print(utime)
    return utime

I have confirmed this function works properly. So I go ahead and run it on the entire column and decided to make a new column called unixTime to store it:

timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))

Seems like it worked. I spent weeks thinking this was accurate, running algorithms on the data only to find out recently the data is clumping in a way it shouldn't; multiple readings on a single date. So I go ahead and tell spark to print the column. Doing so actually causes the function to call for each row. I new this would be a thing, so I put in the print statements as a sanity check: result_conv.select('unixTime').head(5)

It output this # comments by me:

2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0)]

Does anyone know what I'm missing here? I've even confirmed that the float in the row list doesn't exist when running more than 5 lines, so I don't know where that value is arising from or why it's duplicating across rows. It's neither the average nor the median value (and those shouldn't be used anyway), and I don't know why it's duplicating (the number of duplicates isn't consistent when I look at longer stretches of rows). I'd really like to avoid having to convert this to a pandas DF, then back to a spark DF to do this. Bottom line is I need to convert the date string into a unixtime float that is unique per line for this sensor (as it is in the data).

Thanks for any help!


Solution

  • You don't have to use UDFs. You can use inbuilt pyspark functions to do the same task. UDFs are the last resort. They slow down your program.

    Here's what I did. I think the minor difference in value is due to timezone issues probably. Not sure though. If you better specify the problem, I could help further. E.g. (1514745006 (my computer) - 1514782806 (your computer)) = 37800 seconds = 10.5 hours. So it means you are 10.5 hrs ahead of my timezone.

    import pyspark.sql.functions as F
    from pyspark import SparkContext, SQLContext
    
    sc = SparkContext('local')
    sqlContext = SQLContext(sc)
    ### This is very important setting if you want legacy behaviour
    sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")
    
    data1 = [
        ["2018/01/01 00:00:06"],
        ["2018/01/01 00:00:30"],
        ["2018/01/01 00:00:54"],
        ["2018/01/01 00:01:18"],
        ["2018/01/01 00:01:42"],
    
    
    ]
    
    df1Columns = ["time_col"]
    df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)
    
    # 1514782806.0  # the correct output from the function
    
    df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
    df1.show(n=100, truncate=False)
    

    Output :

    +-------------------+-------------+
    |time_col           |integer_value|
    +-------------------+-------------+
    |2018/01/01 00:00:06|1514745006   |
    |2018/01/01 00:00:30|1514745030   |
    |2018/01/01 00:00:54|1514745054   |
    |2018/01/01 00:01:18|1514745078   |
    |2018/01/01 00:01:42|1514745102   |
    +-------------------+-------------+