I am doing a structured streaming using SparkSession.readStream
and writing it to hive table, but seems it does not allow me to time-based micro-batches, i.e. I need a batch of 5 secs. All the messages should forms a batch of 5 secs, and the batch data should get written to hive table.
Right now its reading the messages as and when they are being posted to Kafka topic, and each message is one record for the table.
Working Code
def hive_write_batch_data(data, batchId):
data.write.format("parquet").mode("append").saveAsTable("test.my_table")
kafka_config = {
"checkpointLocation":"/user/aiman/temp/checkpoint",
"kafka.bootstrap.servers":"kafka.bootstrap.server.com:9093",
"subscribe":"TEST_TOPIC",
"startingOffsets": offsetValue,
"kafka.security.protocol":"SSL",
"kafka.ssl.keystore.location": "kafka.keystore.uat.jks",
"kafka.ssl.keystore.password": "abcd123",
"kafka.ssl.key.password":"abcd123",
"kafka.ssl.truststore.type":"JKS",
"kafka.ssl.truststore.location": "kafka.truststore.uat.jks",
"kafka.ssl.truststore.password":"abdc123",
"kafka.ssl.enabled.protocols":"TLSv1",
"kafka.ssl.endpoint.identification.algorithm":""
}
df = spark.readStream \
.format("kafka") \
.options(**kafka_config) \
.load()
data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
data_new = data.select(col("offset"),col("partition"),col("key"),json_tuple(col("value"),"product_code","rec_time")) \
.toDF("offset","partition","key","product_code","rec_time")
data_new.writeStream. \
.foreachBatch(hive_write_batch_data) \
.start() \
.awaitTermination()
Problem Statement
Since each message is being treated as one record entry in hive table, a single parquet file is being created for each record, which can trigger hive's small-file issue.
I need to create a time-based batch so that multiple records gets inserted into hive table in one batch. For that I only found KafkaUtils
to be having support for time-based using ssc = StreamingContext(sc, 5)
but it does not create Dataframes.
How should I use KafkaUtils
to create batches read into dataframes ?
Adding a trigger worked. Added a trigger in the stream writer:
data_new.writeStream \
.trigger(processingTime="5 seconds") \ #Trigger
.foreachBatch(hive_write_batch_data) \
.start() \
.awaitTermination()
Found the article here