I have some data stored in JSON format like this:
{
"id":1,
"time":"2023-01-01 12:34:56"
}
And also I have a Apache Hudi table with same columns. The schema of hudi table was (read by pyarrow):
id: int64
time: timestamp[us, tz=UTC]
I try to use Spark to merge the JSON data into Hudi table:
with SparkSession.builder.getOrCreate() as spark:
df = spark.read.json('path/to/json/files')
df.createOrReplaceTempView('upsert_data')
spark.sql(f"CREATE TABLE snapshot USING hudi LOCATION 'path/to/hudi/table'")
sql_upsert='''MERGE INTO snapshot AS target
USING upsert_data AS source
ON source.id = target.id
WHEN MATCHED THEN UPDATE SET target.time=source.time
WHEN NOT MATCHED THEN INSERT (id, time) values (source.id, source.time)'''
spark.sql(sql_upsert)
But after the data was merged, the time
value in Hudi table become 56019-01-08 12:34:56
. I think maybe that's because Spark automatically change the time
string into timestamp[ns]
, which was different from Hudi table's timestamp[us]
. How can I solve this problem?
I search the PySpark document but didn't find any functions/settings could directly achieve this. Finally I use cast(to_timestamp(time) as long)/1000
to manually change the time unit from nanosecond to microsecond