Search code examples
databricksazure-databricksspark-structured-streamingdelta-lake

create partition column from structure streaming json data


I'm new to structured streaming and would like to a create a partition column based on date column from json message.

Here is the sample message :

{"date": "2022-03-01", "code": "1000310014", "no": "20191362283860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}

{"date": "2022-03-01", "code": "2000310014", "no": "300191362283860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}

{"date": "2022-03-01", "code": "30002220014", "no": "20191333383860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}

val date = event.select(col("date"))

val stream = flatten_messages
      .writeStream
      .partitionBy(date)
      .format("delta")
      .outputMode("append")
      .start(output_path)

Is this right way partition on json message?


Solution

  • No, in the partitionBy you just specify column names, not dataframe. So the code would be just:

    val stream = flatten_messages
          .writeStream
          .partitionBy("date")
          .format("delta")
          .outputMode("append")
          .start(output_path)
    

    But the first question would be - do you really need partitioning of the data? It may not be strictly required with Delta that has things like data skipping, ZOrder, etc.

    P.S. Also, you may need to cast date column to a date type - in this case it will be stored more efficiently on the disk, and will allow range search, etc. Although it's not related to partitioning.