Search code examples
scalaapache-sparkapache-spark-sqlapache-zeppelin

How to obtain DataFrame from the database table retrived with JDBC cut by the multiple date ranges with one date range per row in another DataFrame?


I have the DataFrame with the event start time:

val dailySummariesDfVisualize =
      dailySummariesDf
    .orderBy("event_time").registerTempTable("raw")    

val df = sqlContext.sql("select * from raw")
df.show()

+--------------------+-------------------+-------------+-----------------+---------------+
|          event_type|         event_time|event_payload|coffee_machine_id|digital_twin_id|
+--------------------+-------------------+-------------+-----------------+---------------+
|Large_Cup_Button_...|2021-03-24 07:06:34|         null|       NP20100005|     NP20100005|
|Large_Cup_Button_...|2021-03-24 07:07:41|         null|       NP20100005|     NP20100005|

I'd like each event_time row would be the start of the time range and event_time+1 minute would be the end of the time range, and there would be as many time ranges as there are rows in the DataFrame above. I'd like to make a query and extact to the new DataFrame from another table only the items between the start and the end of each time range. How can I do that? Is join here the only option, or neted query can be useful? Or may be only foreach on DataFrame can be helpful there?


Solution

  • I was able to resolve that with producing the where clause in scala from the DataFrame Rows, which are not so numerous compared to the data from whom I do the extraction query:

    var collectedString = scala.collection.mutable.MutableList[String]()
    
    for (row <- dailySummariesDfVisualize.collectAsList())
      {
          println(row(1))
          val start = row(1)
          val end = row(5)
          val timeSelection = s" time > ' ${start}' and  time < '${end}'"
          collectedString+=timeSelection    
      }
    
    val whereClause = collectedString.mkString(" or ")
    println(whereClause)
    
    val dailySensorData =
          getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(), 
          s"SELECT *  FROM values WHERE "+whereClause+" limit 1000000")
           .persist(StorageLevel.MEMORY_ONLY_SER)    
    
    dailySensorData.show(400, false)
    

    It does the output what I was actually needed with acceptable performance.

    The formatted whereClause output is something like:

    time > ' 2021-03-24 07:06:34.0' and  time < '2021-03-24 07:08:34.0' or  time > ' 2021-03-24 07:07:41.0' and  time < '2021-03-24 07:09:41.0' or  time > ' 2021-03-24 07:07:43.0' and  time < '2021-03-24 07:09:43.0'
    

    and so on

    So just with adding this where clause to

    s"SELECT *  FROM values WHERE "+whereClause+" limit 1000000"
    

    I was able to extract only needed time ranges from the data in one query the most optimal way.