Search code examples
javahadoopparquet

Parquet writer: org.apache.parquet.io.ParquetEncodingException: writing empty page


I'm using Apache Parquet Hadoop - ParquetRecordWriter with MapReduce and hit ParquetEncodingException: writing empty page. Despite I found, that this is happening in ColumnWriterBase when the valueCount is 0, I don't undrestand the real reason why this property is 0, why it has something with Endoding and how can such state happened? Any idea? Thanks for any tip.

Error: org.apache.parquet.io.ParquetEncodingException: writing empty page
    at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:309)
    at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)

Version: org.apache.parquet:parquet-hadoop:1.11.0

I'm using my own WriteSupport class:

public class MyDataWriteSupport extends WriteSupport<MyData> {

  private RecordConsumer recordConsumer;

  public MyDataWriteSupport() {}

  @Override
  public WriteContext init(Configuration configuration) {
    Map<String, String> metaData = new HashMap<>();
    return new WriteContext(getSchema(), metaData);
  }

  public static MessageType getSchema() {
    return MessageTypeParser.parseMessageType(
        " message MyData { "
            + "optional binary key (UTF8);"
            + "optional int64 length;"
            + "repeated int32 datarray;"
            + "repeated group myobj {\n"
            + "   optional int32 id;"
            + "   optional binary title (UTF8);"
            + "}"
            + " }");
  }

  @Override
  public void prepareForWrite(RecordConsumer recordConsumer) {
    this.recordConsumer = recordConsumer;
  }

  @Override
  public void write(MyData record) {
    recordConsumer.startMessage();
    writeData(record);
    recordConsumer.endMessage();
  }

  private void writeData(MyData record) {
    recordConsumer.startMessage();

    addStringValue(recordConsumer, 0, "key", record.getKey());
    addLongValue(recordConsumer, 1, "length", record.getLength());
    addIntegerArrayValues(recordConsumer, 2, "datarray", record.getDataArray());

    if (!record.getMyObjects().isEmpty()) {
      recordConsumer.startField("myobj", 3);
      record
          .getMyObject()
          .forEach(
              obj -> {
                recordConsumer.startGroup();
                addIntValue(recordConsumer, 0, "id", obj.id);
                addStringValue(recordConsumer, 1, "title", obj.title);
                recordConsumer.endGroup();
              });
      recordConsumer.endField("myobj", 3);
    }
    recordConsumer.endMessage();
  }

  private void addIntValue(RecordConsumer recordConsumer, int index, String fieldName, int value) {
    recordConsumer.startField(fieldName, index);
    recordConsumer.addInteger(value);
    recordConsumer.endField(fieldName, index);
  }

  private static void addIntegerArrayValues(
      RecordConsumer recordConsumer, int index, String fieldName, int[] is) {
    if (is.length > 0) {
      recordConsumer.startField(fieldName, index);
      Arrays.stream(is).forEach(labelIndex -> recordConsumer.addInteger(labelIndex));
      recordConsumer.endField(fieldName, index);
    }
  }

  private static void addLongValue(
      RecordConsumer recordConsumer, int index, String fieldName, long value) {
    recordConsumer.startField(fieldName, index);
    recordConsumer.addLong(value);
    recordConsumer.endField(fieldName, index);
  }

  private static void addStringValue(
      RecordConsumer recordConsumer, int index, String fieldName, String value) {
    recordConsumer.startField(fieldName, index);
    recordConsumer.addBinary(Binary.fromString(value));
    recordConsumer.endField(fieldName, index);
  }
}

Solution

  • I think the problem is with the start/end calls. One issue is that startMessage() and endMessage() are invoked twice, once in write(MyData) and again in writeData(MyData). I would suggest using the ValidatingRecordConsumer as a wrapper for the recordConsumer you use. This way you may get more meaningful exceptions if something is wrong with the record serialization.