Search code examples
protocol-buffersapache-flinkflink-streaming

Flink streaming app produces many inprogress empty output files


I have the the following streaming app that reads Protobuf messages from Kafka topic and writes them to a FileSystem parquet sink:

class ProtoDeserializer extends DeserializationSchema[User] {

  override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])

  override def deserialize(message: Array[Byte]): User =
    User.parseFrom(message.slice(6, message.length))

  override def isEndOfStream(nextElement: User): Boolean = false
}

object StreamingKafkaProtoToParquetLocalFs {

  private val brokers = "localhost:9092"
  private val topic = "test-topic-proto"
  private val consumerGroupId = "test-consumer-proto"
  private val targetPath = "file:///my/path"


  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
    env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])

    val source = KafkaSource.builder[User]
      .setBootstrapServers(brokers)
      .setTopics(topic)
      .setGroupId(consumerGroupId)
      .setValueOnlyDeserializer(new ProtoDeserializer)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build

    val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
    val sink: StreamingFileSink[User] = StreamingFileSink
      .forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
      .build()
    input.addSink(sink)
    env.execute()
  }
}

When I execute the program I see that all the output files written into the target path are empty (0 sized) and inprogress although I enabled the checkpointing.

It's important to mention that the topic is not empty and when I change the sink to be print() the messages are printed correctly.

What am I missing? why do the print and the parquet sink behave differently?


Solution

  • It seems that adding explicitly the dependency for Apache Parquet Protobuf resolves the issue.

    For Maven users, the following dependency was added to the pom.xml:

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>1.11.1</version>
    </dependency>