I am using pyspark to read data from a Kafka topic as a streaming dataframe as follows:
spark = SparkSession.builder \
.appName("Spark Structured Streaming from Kafka") \
.getOrCreate()
sdf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load() \
.select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))
sdf_ = sdf.select("parsed_value.*")
My goal is to write each of the sdf_
rows as seperate json files.
The following code:
writing_sink = sdf_.writeStream \
.format("json") \
.option("path", "/Desktop/...") \
.option("checkpointLocation", "/Desktop/...") \
.start()
writing_sink.awaitTermination()
will write several rows of the dataframe within the same json, depending on the size of the micro-batch (or this is my hypothesis at least). What I need is to tweak the above so that each row of the dataframe is written in a separate json file.
I have also tried using partitionBy('column')
, but still this will not do exactly what I need, but instead create folders within which the json files might still have multiple rows written within them (if they have the same id).
Any ideas that could help out here? Thanks in advance.
Found out that the following option does the trick:
.option("maxRecordsPerFile", 1)