Search code examples
mongodbpysparkdatabricksazure-databricks

Data type is not changing in MongoDB via databricks Pyspark ( from string to date)


I am trying to load multiple documents into the MongoDb collection using databricks pyspark and while loading I am using updateDate filed as well , but after loading I could see updateDate field data type is string instead date data type.

here i am using the code for timestamp.

import datetime

current_timestamp_utc = datetime.datetime.now(datetime.timezone.utc)
formatted_timestamp = current_timestamp_utc.strftime("%Y-%m-%dT%H:%M:%S")
timezone_offset = current_timestamp_utc.strftime("%z")
formatted_timestamp = formatted_timestamp + ".000" + timezone_offset[:-2] + ":" + 
timezone_offset[-2:]

print(formatted_timestamp)

result : 2024-04-03T07:33:52.000+00:00

as per the result looks fine , but after loading into MongoDb , it is displaying as String instead Date.

So could you please help me on this how to load the documents with date data type. I have used UpdateMany() method to change string to date data type and is this the write approach to proceed or is there any I/O or performance impact while using updatemany() method . Please suggest


Solution

  • You can get the current time directly using spark SQL date-time functions as follows:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_format, current_timestamp
    
    spark = SparkSession.builder.getOrCreate()
    
    spark.sql("""select date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx") as updateDate""").show(truncate=False)
    
    Output:
    +-----------------------------+
    |updateDate                   |
    +-----------------------------+
    |2024-04-04T09:04:35.865+00:00|
    +-----------------------------+
    
    Schema:
    root
     |-- updateDate: string (nullable = false)
    

    If you notice the schema, updateDate is a string and you can convert it into a timestamp using to_timestamp() as follows:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_format, current_timestamp, to_timestamp
    
    spark = SparkSession.builder.getOrCreate()
    
    spark.sql("""select to_timestamp(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx")) as updateDate""").show(truncate=False)
    
    Output:
    +-----------------------+
    |updateDate             |
    +-----------------------+
    |2024-04-04 09:04:12.703|
    +-----------------------+
    
    Schema:
    root
     |-- updateDate: timestamp (nullable = true)
    

    And now the updateDate is a timestamp adjusted for your spark session's timezone (that's why the +00:00 offset is gone now) - which, by the way, you can update using spark.conf.set("spark.sql.session.timeZone", "<enter-timezone-here>").

    And if you want to add it to an existing dataframe as a new column you can do something like this:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_format, current_timestamp
    
    spark = SparkSession.builder.getOrCreate()
    
    df = spark.createDataFrame([(1,), (2,)], ["rownum"]) # replace this with your dataframe
    
    df = df.withColumn("updateDate", date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx").cast("timestamp"))
    
    df.show(truncate=False)
    df.printSchema()
    
    Output:
    +------+-----------------------+
    |rownum|updateDate             |
    +------+-----------------------+
    |1     |2024-04-04 09:04:48.473|
    |2     |2024-04-04 09:04:48.473|
    +------+-----------------------+
    
    Schema:
    root
     |-- rownum: long (nullable = true)
     |-- updateDate: timestamp (nullable = true)
    

    The datatype you are looking for is a timestamp with a timezone in Spark. Now, you can try loading the dataset into MongoDB with this schema.