I have the the following streaming app that reads Protobuf messages from Kafka topic and writes them to a FileSystem parquet sink:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])
override def deserialize(message: Array[Byte]): User =
User.parseFrom(message.slice(6, message.length))
override def isEndOfStream(nextElement: User): Boolean = false
}
object StreamingKafkaProtoToParquetLocalFs {
private val brokers = "localhost:9092"
private val topic = "test-topic-proto"
private val consumerGroupId = "test-consumer-proto"
private val targetPath = "file:///my/path"
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer)
.setStartingOffsets(OffsetsInitializer.earliest())
.build
val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
val sink: StreamingFileSink[User] = StreamingFileSink
.forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
.build()
input.addSink(sink)
env.execute()
}
}
When I execute the program I see that all the output files written into the target path are empty (0 sized) and inprogress
although I enabled the checkpointing.
It's important to mention that the topic is not empty and when I change the sink to be print()
the messages are printed correctly.
What am I missing? why do the print
and the parquet sink behave differently?
It seems that adding explicitly the dependency for Apache Parquet Protobuf resolves the issue.
For Maven users, the following dependency was added to the pom.xml
:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.11.1</version>
</dependency>