Search code examples
apache-sparkhadoopapache-kafkaspark-streamingspark-streaming-kafka

Avoid write files for empty partitions in Spark Streaming


I have Spark Streaming job which reads data from kafka partitions (one executor per partition).
I need to save transformed values to HDFS, but need to avoid empty files creation.
I tried to use isEmpty, but this doesn't help when not all partitions are empty.

P.S. repartition is not an acceptable solution due to perfomance degradation.


Solution

  • The code works for PairRDD only.

    Code for text:

      val conf = ssc.sparkContext.hadoopConfiguration
      conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
        classOf[TextOutputFormat[Text, NullWritable]]
        classOf[OutputFormat[Text, NullWritable]])
    
      kafkaRdd.map(_.value -> NullWritable.get)
        .saveAsNewAPIHadoopFile(basePath,
          classOf[Text],
          classOf[NullWritable],
          classOf[LazyOutputFormat[Text, NullWritable]],
          conf)
    

    Code for avro:

      val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
      val conf = ssc.sparkContext.hadoopConfiguration
    
      conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
      conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
        classOf[AvroKeyOutputFormat[MyEvent]],
        classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])
    
      avro.saveAsNewAPIHadoopFile(basePath,
        classOf[AvroKey[MyEvent]],
        classOf[NullWritable],
        classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
        conf)