Search code examples
csvapache-sparkdate-formattingazure-databricks

How to read custom formatted dates as timestamp in pyspark


I want to use spark.read() to pull data from a .csv file, while enforcing a schema. However, I can't get spark to recognize my dates as timestamps.

First I create a dummy file to test with

%scala
Seq("1|1/15/2019 2:24:00 AM","2|test","3|").toDF().write.text("/tmp/input/csvDateReadTest")

Then I try to read it, and provide a dateFormat string, but it doesn't recognize my dates, and sends the records to the badRecordsPath

df = spark.read.format('csv')
               .schema("id int, dt timestamp")
               .option("delimiter","|")
               .option("badRecordsPath","/tmp/badRecordsPath")
               .option("dateFormat","M/dd/yyyy hh:mm:ss aaa")
               .load("/tmp/input/csvDateReadTest")

As the result, I get just 1 record in df (ID 3), when I'm expecting to see 2. (IDs 1 and 3)

df.show()

+---+----+
| id|  dt|
+---+----+
|  3|null|
+---+----+



Solution

  • You must change the dateFormat to timestampFormat since in your case you need a timestamp type and not a date. Additionally the value of timestamp format should be mm/dd/yyyy h:mm:ss a.

    Sample data:

    Seq(
    "1|1/15/2019 2:24:00 AM",
    "2|test",
    "3|5/30/1981 3:11:00 PM"
    ).toDF().write.text("/tmp/input/csvDateReadTest")
    

    With the changes for the timestamp:

    val df = spark.read.format("csv")
                   .schema("id int, dt timestamp")
                   .option("delimiter","|")
                   .option("badRecordsPath","/tmp/badRecordsPath")
                   .option("timestampFormat","mm/dd/yyyy h:mm:ss a")
                   .load("/tmp/input/csvDateReadTest")
    

    And the output:

    +----+-------------------+
    |  id|                 dt|
    +----+-------------------+
    |   1|2019-01-15 02:24:00|
    |   3|1981-01-30 15:11:00|
    |null|               null|
    +----+-------------------+
    

    Note that the record with id 2 failed to comply with the schema definition and therefore it will contain null. If you want to keep also the invalid records you need to change the timestamp column into string and the output in this case will be:

    +---+--------------------+
    | id|                  dt|
    +---+--------------------+
    |  1|1/15/2019 2:24:00 AM|
    |  3|5/30/1981 3:11:00 PM|
    |  2|                test|
    +---+--------------------+
    

    UPDATE:

    In order to change the string dt into timestamp type you could try with df.withColumn("dt", $"dt".cast("timestamp")) although this will fail and replace all the values with null.

    You can achieve this with the next code:

    import org.apache.spark.sql.Row
    import java.text.SimpleDateFormat
    import java.util.{Date, Locale} 
    import java.sql.Timestamp
    import scala.util.{Try, Success, Failure}
    
    val formatter = new SimpleDateFormat("mm/dd/yyyy h:mm:ss a", Locale.US)
    df.map{ case Row(id:Int, dt:String) =>
        val tryParse = Try[Date](formatter.parse(dt))
    
        val p_timestamp = tryParse match {
            case Success(parsed) => new Timestamp(parsed.getTime())
            case Failure(_) => null
        }
    
        (id, p_timestamp)
    }.toDF("id", "dt").show
    

    Output:

    +---+-------------------+
    | id|                 dt|
    +---+-------------------+
    |  1|2019-01-15 02:24:00|
    |  3|1981-01-30 15:11:00|
    |  2|               null|
    +---+-------------------+