I'm using Apache Parquet Hadoop - ParquetRecordWriter with MapReduce and hit ParquetEncodingException: writing empty page
. Despite I found, that this is happening in ColumnWriterBase when the valueCount is 0, I don't undrestand the real reason why this property is 0, why it has something with Endoding and how can such state happened? Any idea? Thanks for any tip.
Error: org.apache.parquet.io.ParquetEncodingException: writing empty page
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:309)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
Version: org.apache.parquet:parquet-hadoop:1.11.0
I'm using my own WriteSupport class:
public class MyDataWriteSupport extends WriteSupport<MyData> {
private RecordConsumer recordConsumer;
public MyDataWriteSupport() {}
@Override
public WriteContext init(Configuration configuration) {
Map<String, String> metaData = new HashMap<>();
return new WriteContext(getSchema(), metaData);
}
public static MessageType getSchema() {
return MessageTypeParser.parseMessageType(
" message MyData { "
+ "optional binary key (UTF8);"
+ "optional int64 length;"
+ "repeated int32 datarray;"
+ "repeated group myobj {\n"
+ " optional int32 id;"
+ " optional binary title (UTF8);"
+ "}"
+ " }");
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
@Override
public void write(MyData record) {
recordConsumer.startMessage();
writeData(record);
recordConsumer.endMessage();
}
private void writeData(MyData record) {
recordConsumer.startMessage();
addStringValue(recordConsumer, 0, "key", record.getKey());
addLongValue(recordConsumer, 1, "length", record.getLength());
addIntegerArrayValues(recordConsumer, 2, "datarray", record.getDataArray());
if (!record.getMyObjects().isEmpty()) {
recordConsumer.startField("myobj", 3);
record
.getMyObject()
.forEach(
obj -> {
recordConsumer.startGroup();
addIntValue(recordConsumer, 0, "id", obj.id);
addStringValue(recordConsumer, 1, "title", obj.title);
recordConsumer.endGroup();
});
recordConsumer.endField("myobj", 3);
}
recordConsumer.endMessage();
}
private void addIntValue(RecordConsumer recordConsumer, int index, String fieldName, int value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addInteger(value);
recordConsumer.endField(fieldName, index);
}
private static void addIntegerArrayValues(
RecordConsumer recordConsumer, int index, String fieldName, int[] is) {
if (is.length > 0) {
recordConsumer.startField(fieldName, index);
Arrays.stream(is).forEach(labelIndex -> recordConsumer.addInteger(labelIndex));
recordConsumer.endField(fieldName, index);
}
}
private static void addLongValue(
RecordConsumer recordConsumer, int index, String fieldName, long value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addLong(value);
recordConsumer.endField(fieldName, index);
}
private static void addStringValue(
RecordConsumer recordConsumer, int index, String fieldName, String value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addBinary(Binary.fromString(value));
recordConsumer.endField(fieldName, index);
}
}
I think the problem is with the start/end calls. One issue is that startMessage()
and endMessage()
are invoked twice, once in write(MyData)
and again in writeData(MyData)
.
I would suggest using the ValidatingRecordConsumer
as a wrapper for the recordConsumer you use. This way you may get more meaningful exceptions if something is wrong with the record serialization.