Search code examples
hadoophbaseapache-phoenixbulk-load

why hbase KeyValueSortReducer need to sort all KeyValue


I am learning Phoenix CSV Bulk Load recently and I found that the source code of org.apache.phoenix.mapreduce.CsvToKeyValueReducer will cause OOM ( java heap out of memory ) when columns are large in one row (In my case, 44 columns in one row and the avg size of one row is 4KB).

What's more, this class is similar with the hbase bulk load reducer class - KeyValueSortReducer. It means that OOM may happen when using KeyValueSortReducer in my case.

So, I have a question of KeyValueSortReducer - why it need to sort all kvs in treeset first and then write all of them to context? If I remove the treeset sorting code and wirte all kvs directly to the context, the result will be different or be wrong ?

I am looking forward to your reply. Best wish to you!

here is the source code of KeyValueSortReducer:

public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
      org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
  throws java.io.IOException, InterruptedException {
    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
    for (KeyValue kv: kvs) {
      try {
        map.add(kv.clone());
      } catch (CloneNotSupportedException e) {
        throw new java.io.IOException(e);
      }
    }
    context.setStatus("Read " + map.getClass());
    int index = 0;
    for (KeyValue kv: map) {
      context.write(row, kv);
      if (++index % 100 == 0) context.setStatus("Wrote " + index);
    }
  }
}

Solution

  • please have a look in to this case study. there are some requirements where you need to order keyvalue pairs into the same row in the HFile.