Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streaming

Fixed interval micro-batch and once time micro-batch trigger mode don't work with Parquet file sink


I'm trying to consume data on Kafka topic and push consumed messages to HDFS with parquet format. I'm using pyspark (2.4.5) to create Spark structed streaming process. The problem is my Spark job is endless and no data is pushed to HDFS.


process = (
    # connect to kafka brokers
    (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", "brokers_list")
        .option("subscribe", "kafka_topic")
        .option("startingOffset", "earliest")
        .option("includeHeaders", "true")
        .load()
        .writeStream.format("parquet")
        .trigger(once=True). # tried with processingTime argument and have same result
        .option("path", f"hdfs://hadoop.local/draft")
        .option("checkpointLocation", "hdfs://hadoop.local/draft_checkpoint")
        .start()
    )
)

My Spark session's UI is liked this: Spark UI

More details on stage: Spark stage details

I check status on my notebook and got this:

{
    'message': 'Processing new data',
    'isDataAvailable': True,
    'isTriggerActive': True
}

When I check my folder on HDFS, there is no data is loaded. Only a directory named _spark_metadata is created in the output_location folder.

I don't face this problem if I remove the line of triggerMode trigger(processingTime="1 minute"). When I use default trigger mode, spark create a lot of small parquet file in the output location, this is inconvenient. Does 2 trigger mode processingTime and once support for parquet file sink? If I have to use the default trigger mode, how can I handle the gigantic number of tiny files created in my HDFS system?


Solution

  • My problem was configure my StructuredStreaming with option startingOffset=earliest. For this configuration, Spark will try to consume all the data available in Kafka stream and write it to HDFS once. This takes a lot of time if the stream topic is gigantic.

    For handling problem of too many small files, there are 2 options:

    • configure the structured streaming with triggerMode(processingTime=interval_definition)
    • configure the structured streaming with triggerMode(once=True) and schedule to start the Spark Application (I use Airflow to do this)