I'm trying to consume data on Kafka topic and push consumed messages to HDFS with parquet format. I'm using pyspark (2.4.5) to create Spark structed streaming process. The problem is my Spark job is endless and no data is pushed to HDFS.
process = (
# connect to kafka brokers
(
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "brokers_list")
.option("subscribe", "kafka_topic")
.option("startingOffset", "earliest")
.option("includeHeaders", "true")
.load()
.writeStream.format("parquet")
.trigger(once=True). # tried with processingTime argument and have same result
.option("path", f"hdfs://hadoop.local/draft")
.option("checkpointLocation", "hdfs://hadoop.local/draft_checkpoint")
.start()
)
)
My Spark session's UI is liked this:
I check status on my notebook and got this:
{
'message': 'Processing new data',
'isDataAvailable': True,
'isTriggerActive': True
}
When I check my folder on HDFS, there is no data is loaded. Only a directory named _spark_metadata
is created in the output_location
folder.
I don't face this problem if I remove the line of triggerMode trigger(processingTime="1 minute")
. When I use default trigger mode, spark create a lot of small parquet file in the output location, this is inconvenient.
Does 2 trigger mode processingTime
and once
support for parquet file sink?
If I have to use the default trigger mode, how can I handle the gigantic number of tiny files created in my HDFS system?
My problem was configure my StructuredStreaming with option startingOffset=earliest
.
For this configuration, Spark will try to consume all the data available in Kafka stream and write it to HDFS once.
This takes a lot of time if the stream topic is gigantic.
For handling problem of too many small files, there are 2 options:
triggerMode(processingTime=interval_definition)
triggerMode(once=True)
and schedule to start the Spark Application (I use Airflow to do this)