Search code examples
javaapache-sparkapache-kafkahbasestreaming

Spark to Hbase Table is not showing complete data records


I have a log file of 30k records, which I am publishing from Kafka and through spark I am persisting it into HBase. Out of 30K records, I can see only 4K records in HBase table.

  1. I have tried saving the stream in MySQL and it is saving all records in MySql properly.
  2. But in HBase if I publish a file of 100 records in Kafka topic, it saves 36 records in HBase table where if I publish 30K records Hbase shows only 4k records.
  3. Also, Records(rows) in HBase are not in sequence like 1..3..10..17th. final Job newAPIJobConfiguration1 = Job.getInstance(config); newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "logs"); newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); HTable hTable = new HTable(config, "country"); lines.foreachRDD((rdd,time)-> { // Get the singleton instance of SparkSession SparkSession spark = SparkSession.builder().config(rdd.context().getConf()).getOrCreate(); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD rowRDD = rdd.map(line -> { String[] logLine = line.split(" +"); Log record = new Log(); record.setTime((logLine[0])); record.setTime_taken((logLine[1])); record.setIp(logLine[2]); return record; }); saveToHBase(rowRDD, newAPIJobConfiguration1.getConfiguration()); }); ssc.start(); ssc.awaitTermination(); } //6. saveToHBase method - insert data into HBase public static void saveToHBase(JavaRDD rowRDD, Configuration conf) throws IOException { // create Key, Value pair to store in HBase JavaPairRDD hbasePuts = rowRDD.mapToPair( new PairFunction() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Log row) throws Exception { Put put = new Put(Bytes.toBytes(System.currentTimeMillis())); //put.addColumn(Bytes.toBytes("sparkaf"), Bytes.toBytes("message"), Bytes.toBytes(row.getMessage())); put.addImmutable(Bytes.toBytes("time"), Bytes.toBytes("col1"), Bytes.toBytes(row.getTime())); put.addImmutable(Bytes.toBytes("time_taken"), Bytes.toBytes("col2"), Bytes.toBytes(row.getTime_taken())); put.addImmutable(Bytes.toBytes("ip"), Bytes.toBytes("col3"), Bytes.toBytes(row.getIp())); return new Tuple2(new ImmutableBytesWritable(), put); } }); // save to HBase- Spark built-in API method //hbasePuts.saveAsNewAPIHadoopDataset(conf); hbasePuts.saveAsNewAPIHadoopDataset(conf);

Solution

  • Since HBase stores records uniquely by rowkey, it is very possible that you are overwriting records.

    You are using the currentTime in milliseconds as the rowkey and any records created with the same rowkey will overwrite the old one.

    Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));
    

    So if 100 Puts are created in 1 millisecond, then only 100 will show up in HBase since the same row was overwritten 99 times.

    It's likely that the 4k rowkeys in HBase are the 4k unique milliseconds (4 seconds) it took to load the data.

    I would suggest using a different rowkey design. Also, as a side note, it is typically a bad idea to use monotonically increasing rowkeys in HBase: Further Information