Search code examples
scalahadoopapache-spark-sqlapache-spark-1.5

Pull monthly data using Spark Scala


I am trying to pull the data from a file for a month and then process it.Basically i need to pull data for each month and do some transformations. Since my job runs daily, i want to make use of it and populate data for that month till the run_date.

I have two approaches in mind:

Approach 1:

populate the data for previous month alone. For eg, if my current_date or run_date is in the month of May, i would be populating the data for the month of April. This can be achieved by pulling the month from current_date() and subtracting 1 from it. Something similar to below:

df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1))

This is just an idea. This code wont achieve what i am trying to do since i am subtracting month part alone and not considering Year part.

But in this case, my job will be running daily to populate same data for the whole month. Doesn't make sense to do it.

Approach 2:

If my current_date is 2020-05-27, i want to pull the data from 2020-05-01 to 2020-05-26. If my current date is 2020-06-01, it should populate the data for the month of May that is from 2020-05-01 to 2020-05-31.

I want to implement Approach 2. The only idea i can think of was to write couple of Case statement to check for dates and accordingly populate it.

Can someone please share some idea on it. Is there any slightly straight forward way.

I am using Spark 1.5


Solution

  • Check if this helps-

    1. Load the testing data

    val data =
          """
            |2018-04-07 07:07:17
            |2018-04-07 07:32:27
            |2018-04-07 08:36:44
            |2018-04-07 08:38:00
            |2018-04-07 08:39:29
            |2018-04-08 01:43:08
            |2018-04-08 01:43:55
            |2018-04-09 07:52:31
            |2018-04-09 07:52:42
            |2019-01-24 11:52:31
            |2019-01-24 12:52:42
            |2019-01-25 12:52:42
          """.stripMargin
        val df = spark.read
          .schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
          .csv(data.split(System.lineSeparator()).toSeq.toDS())
        df.show(false)
        df.printSchema()
    

    Output-

    
    +-------------------+
    |startDate          |
    +-------------------+
    |2018-04-07 07:07:17|
    |2018-04-07 07:32:27|
    |2018-04-07 08:36:44|
    |2018-04-07 08:38:00|
    |2018-04-07 08:39:29|
    |2018-04-08 01:43:08|
    |2018-04-08 01:43:55|
    |2018-04-09 07:52:31|
    |2018-04-09 07:52:42|
    |2019-01-24 11:52:31|
    |2019-01-24 12:52:42|
    |2019-01-25 12:52:42|
    +-------------------+
    
    root
     |-- startDate: timestamp (nullable = true)
    

    2. Create Filter Column based on the current date

        val filterCOl = (currentDate: String) =>  when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
          ,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
         date_format(col("startDate"), "yyyy-MM") ===
           date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
        ).otherwise(to_date(col("startDate"))
         .between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))
    

    3. Check when the current data is in between month

     var currentDateStr = "2018-04-08"
        df.filter(filterCOl(currentDateStr)).show(false)
    

    Output-

    +-------------------+
    |startDate          |
    +-------------------+
    |2018-04-07 07:07:17|
    |2018-04-07 07:32:27|
    |2018-04-07 08:36:44|
    |2018-04-07 08:38:00|
    |2018-04-07 08:39:29|
    |2018-04-08 01:43:08|
    |2018-04-08 01:43:55|
    +-------------------+
    
    

    4. Check when the current data is the first day of month

    currentDateStr = "2018-05-01"
        df.filter(filterCOl(currentDateStr)).show(false)
    

    Output-

    +-------------------+
    |startDate          |
    +-------------------+
    |2018-04-07 07:07:17|
    |2018-04-07 07:32:27|
    |2018-04-07 08:36:44|
    |2018-04-07 08:38:00|
    |2018-04-07 08:39:29|
    |2018-04-08 01:43:08|
    |2018-04-08 01:43:55|
    |2018-04-09 07:52:31|
    |2018-04-09 07:52:42|
    +-------------------+