I am trying to create hfiles to do bulk load into Hbase and it keeps throwing the error with the row key even though everything looks fine. I am using the following code:
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
import sqlContext.implicits._
val DF2 = df.filter($"company".isNotNull)
val rdd = DF2.flatMap(x => {
val rowKey = Bytes.toBytes(x(0).toString)
for (i <- 0 to cols.length - 1) yield {
val index = x.fieldIndex(new String(cols(i)))
val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, COLUMN_FAMILY, cols(i), value))
rdd.saveAsNewAPIHadoopFile("HDFS LOcation", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], fconf)
and I am using the following data
It throws the error as
java.io.IOException: Added a key not lexically larger than previous. Current cell = ADC/data:high/1505862570671/Put/vlen=5/seqid=0, lastCell = ADC/data:open/1505862570671/Put/vlen=5/seqid=0
at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:265)
at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:992)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:199)
I even tried sorting the data but still the error is thrown.
After spending couple of hours I found the solution, rootcause is that the columns are not sorted.
Since Hfile needs keyvalue in lexicographically sorted order and in your case while writing HFileOutputFormat2->AbstractHFileWriter
found Added a key not lexically larger than previous. Current cell
. You have already applied sorting at row level once you sort the columns also it would work.
Question here with good explanation why-hbase-keyvaluesortreducer-need-to-sort-all-keyvalue.
//sort columns
val cols = companyDs.columns.sorted
//Rest of the code is same
val output = companyDs.rdd.flatMap(x => {
val rowKey = Bytes.toBytes(x(0).toString)
val hkey = new ImmutableBytesWritable(rowKey)
for (i <- 0 to cols.length - 1) yield {
val index = x.fieldIndex(new String(cols(i)))
val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
val kv = new KeyValue(rowKey,COLUMN_FAMILY, cols(i).getBytes(),System.currentTimeMillis()+i ,x(i).toString.getBytes())
, classOf[ImmutableBytesWritable], classOf[KeyValue],
classOf[HFileOutputFormat2], config)