Search code examples
scalaapache-sparkspark-structured-streaming

How to write stream to S3 with year, month and day of the day when records were received?


I have a simple streams that reads some data from a Kafka topic:

 val ds = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1")
      .option("subscribe", "topic1")
      .option("startingOffsets", "earliest")
      .load()

val df = ds.selectExpr("cast (value as string) as json")
      .select(from_json($"json", schema).as("data"))
      .select("data.*")

I want to store this data in S3 based on the day it's received, so something like:

s3_bucket/year/month/day/data.json

When I want to write the data I do:

df.writeStream
  .format("json")
  .outputMode("append")
  .option("path", s3_path)
  .start()

But if I do this I get to only specify one path. Is there a way to change the s3 path dynamically based on the date?


Solution

  • Use partitionBy clause:

    import org.apache.spark.sql.functions._
    
    df.select(
        dayofmonth(current_date()) as "day",
        month(current_date()) as "month",
        year(current_date()) as "year",
        $"*")
      .writeStream
      .partitionBy("year", "month", "day")
      ... // all other options