I have a log file of 30k records, which I am publishing from Kafka and through spark I am persisting it into HBase. Out of 30K records, I can see only 4K records in HBase table.
final Job newAPIJobConfiguration1 = Job.getInstance(config); newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "logs"); newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
HTable hTable = new HTable(config, "country");
lines.foreachRDD((rdd,time)->
{
// Get the singleton instance of SparkSession
SparkSession spark = SparkSession.builder().config(rdd.context().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD rowRDD = rdd.map(line -> {
String[] logLine = line.split(" +");
Log record = new Log();
record.setTime((logLine[0]));
record.setTime_taken((logLine[1]));
record.setIp(logLine[2]);
return record;
});
saveToHBase(rowRDD, newAPIJobConfiguration1.getConfiguration());
});
ssc.start();
ssc.awaitTermination();
}
//6. saveToHBase method - insert data into HBase
public static void saveToHBase(JavaRDD rowRDD, Configuration conf) throws IOException {
// create Key, Value pair to store in HBase
JavaPairRDD hbasePuts = rowRDD.mapToPair(
new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Log row) throws Exception {
Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));
//put.addColumn(Bytes.toBytes("sparkaf"), Bytes.toBytes("message"), Bytes.toBytes(row.getMessage()));
put.addImmutable(Bytes.toBytes("time"), Bytes.toBytes("col1"), Bytes.toBytes(row.getTime()));
put.addImmutable(Bytes.toBytes("time_taken"), Bytes.toBytes("col2"), Bytes.toBytes(row.getTime_taken()));
put.addImmutable(Bytes.toBytes("ip"), Bytes.toBytes("col3"), Bytes.toBytes(row.getIp()));
return new Tuple2(new ImmutableBytesWritable(), put);
}
});
// save to HBase- Spark built-in API method
//hbasePuts.saveAsNewAPIHadoopDataset(conf);
hbasePuts.saveAsNewAPIHadoopDataset(conf);
Since HBase stores records uniquely by rowkey, it is very possible that you are overwriting records.
You are using the currentTime in milliseconds as the rowkey and any records created with the same rowkey will overwrite the old one.
Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));
So if 100 Puts are created in 1 millisecond, then only 100 will show up in HBase since the same row was overwritten 99 times.
It's likely that the 4k rowkeys in HBase are the 4k unique milliseconds (4 seconds) it took to load the data.
I would suggest using a different rowkey design. Also, as a side note, it is typically a bad idea to use monotonically increasing rowkeys in HBase: Further Information