So I have this weird issue. I'm using a huge dataset that has dates and times in it represented by a single string. This data can be easily converted using datetime.strptime()
, but the problem is the data is so huge, I need to use pyspark to convert it. No problem, I thought, I scoured stackoverflow and saw UDFs! so I made one. unfortunately, the values stored in the dataframe don't match what the function actually produces. I assumed a function would be executed row by row as spark sees the data, but it doesn't seem like that's happening.
Here's what I have (data is a pyspark dataframe called result and I'm showing the first 5 rows):
timestamp | node_id | subsys | sensor | par | val_raw | val_hrf |
---|---|---|---|---|---|---|
2018/01/01 00:00:06 | 001e0610e532 | chemsense | lps25h | temp | -954 | -9.54 |
2018/01/01 00:00:30 | 001e0610e532 | chemsense | lps25h | temp | -954 | -9.54 |
2018/01/01 00:00:54 | 001e0610e532 | chemsense | lps25h | temp | -957 | -9.57 |
2018/01/01 00:01:18 | 001e0610e532 | chemsense | lps25h | temp | -961 | -9.61 |
2018/01/01 00:01:42 | 001e0610e532 | chemsense | lps25h | temp | -962 | -9.62 |
To convert the timestamp column into actionable data, I convert it to a float using a custom function:
def timeswap(x:str):
print(x)
utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
print(utime)
return utime
I have confirmed this function works properly. So I go ahead and run it on the entire column and decided to make a new column called unixTime to store it:
timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))
Seems like it worked. I spent weeks thinking this was accurate, running algorithms on the data only to find out recently the data is clumping in a way it shouldn't; multiple readings on a single date. So I go ahead and tell spark to print the column. Doing so actually causes the function to call for each row. I new this would be a thing, so I put in the print statements as a sanity check: result_conv.select('unixTime').head(5)
It output this # comments by me:
2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0)]
Does anyone know what I'm missing here? I've even confirmed that the float in the row list doesn't exist when running more than 5 lines, so I don't know where that value is arising from or why it's duplicating across rows. It's neither the average nor the median value (and those shouldn't be used anyway), and I don't know why it's duplicating (the number of duplicates isn't consistent when I look at longer stretches of rows). I'd really like to avoid having to convert this to a pandas DF, then back to a spark DF to do this. Bottom line is I need to convert the date string into a unixtime float that is unique per line for this sensor (as it is in the data).
Thanks for any help!
You don't have to use UDFs. You can use inbuilt pyspark functions to do the same task. UDFs are the last resort. They slow down your program.
Here's what I did. I think the minor difference in value is due to timezone issues probably. Not sure though. If you better specify the problem, I could help further. E.g. (1514745006 (my computer) - 1514782806 (your computer)) = 37800 seconds = 10.5 hours
. So it means you are 10.5 hrs ahead of my timezone.
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")
data1 = [
["2018/01/01 00:00:06"],
["2018/01/01 00:00:30"],
["2018/01/01 00:00:54"],
["2018/01/01 00:01:18"],
["2018/01/01 00:01:42"],
]
df1Columns = ["time_col"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)
# 1514782806.0 # the correct output from the function
df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
df1.show(n=100, truncate=False)
Output :
+-------------------+-------------+
|time_col |integer_value|
+-------------------+-------------+
|2018/01/01 00:00:06|1514745006 |
|2018/01/01 00:00:30|1514745030 |
|2018/01/01 00:00:54|1514745054 |
|2018/01/01 00:01:18|1514745078 |
|2018/01/01 00:01:42|1514745102 |
+-------------------+-------------+