Search code examples
scalaapache-sparkhbasecloudera

IOException during splitting java.util.concurrent.ExecutionException: java.io.FileNotFoundException when loading HFile to HBase


I am trying to bulk load data into hbase using the salted table approach as stated in this site: https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/. While I am able to insert data but at random times I get

ERROR mapreduce.LoadIncrementalHFiles: IOException during splitting java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File does not exist: /user//hfile/transactions/transaction_data/b1c6c47856104db0a1289c2b7234d1d7

I am using hbase-client 1.2.0 and hbase-server 1.2.0 as dependencies and using HFileOutputFormat2 class to write my HFile.

I tried using the writer's patch HFileOutputFormat2 https://github.com/gbif/maps/commit/ee4e0001486f3e8b37b034c5b05fc8c8d4e76ab9 but this will fail to write the HFile all together.

Below is the code portion that writes the HFile and loads it to hbase

dates.map { x =>
      val hbase_connection2 = ConnectionFactory.createConnection(hbaseconfig)
      val table2 = hbase_connection2.getTable(TableName.valueOf(hbase_table))
      val regionLoc2 = hbase_connection2.getRegionLocator(table2.getName)
      val admin2 = hbase_connection2.getAdmin

      val transactionsDF = sql(hive_query_2.replace("|columns|", hive_columns) + " " + period_clause.replace("|date|", date_format.format(x)))

      val max_records = transactionsDF.count()

      val max_page = math.ceil(max_records.toDouble/page_limit.toDouble).toInt

      val start_row = 0
      val end_row = page_limit.toInt

      val start_page = if(date_format.format(x).equals(bookmark_date)) {
        bookmarked_page
      }
      else{
        0
      }

      val pages = start_page to (if(max_records < page_limit.toInt){max_page-1}else{max_page})
if(max_records > 0) {
pages.map (page => {
          val sourceDF = transactionsDF
            .withColumn("tanggal", cnvrt_tanggal(transactionsDF.col("ctry_cd"), transactionsDF.col("ori_tanggal"), transactionsDF.col("ori_jam")))
            .withColumn("jam", cnvrt_jam(transactionsDF.col("ctry_cd"), transactionsDF.col("ori_tanggal"), transactionsDF.col("ori_jam")))
            .join(locations, transactionsDF.col("wsid") === locations.col("key"), "left_outer")
            .join(trandescdictionary, lower(transactionsDF.col("source_system")) === lower(trandescdictionary.col("FLAG_TRANS")) && lower(transactionsDF.col("trans_cd")) === lower(trandescdictionary.col("TRAN_CODE")), "left_outer")
            .filter(transactionsDF.col("rowid").between((start_row + (page * page_limit.toInt)).toString, ((end_row + (page * page_limit.toInt)) - 1).toString))
            .withColumn("uuid", timeUUID())
            .withColumn("created_dt", current_timestamp())

          val spp = new SaltPrefixPartitioner(hbase_regions)

          val saltedRDD = sourceDF.rdd.flatMap(r => {
            Seq((salt(r.getString(r.fieldIndex("uuid")), hbase_regions), Seq(r.get(r.fieldIndex(new String(cols(0).toLowerCase))), r.get(r.fieldIndex(new String(cols(1).toLowerCase))), r.get(r.fieldIndex(new String(cols(2).toLowerCase))), r.get(r.fieldIndex(new String(cols(3).toLowerCase))), r.get(r.fieldIndex(new String(cols(4).toLowerCase))), r.get(r.fieldIndex(new String(cols(5).toLowerCase))), r.get(r.fieldIndex(new String(cols(6).toLowerCase))), r.get(r.fieldIndex(new String(cols(7).toLowerCase))), r.get(r.fieldIndex(new String(cols(8).toLowerCase))), r.get(r.fieldIndex(new String(cols(9).toLowerCase))), r.get(r.fieldIndex(new String(cols(10).toLowerCase))), r.get(r.fieldIndex(new String(cols(11).toLowerCase))), r.get(r.fieldIndex(new String(cols(12).toLowerCase))), r.get(r.fieldIndex(new String(cols(13).toLowerCase))), r.get(r.fieldIndex(new String(cols(14).toLowerCase))), r.get(r.fieldIndex(new String(cols(15).toLowerCase))), r.get(r.fieldIndex(new String(cols(16).toLowerCase))), r.get(r.fieldIndex(new String(cols(17).toLowerCase))), r.get(r.fieldIndex(new String(cols(18).toLowerCase))), r.get(r.fieldIndex(new String(cols(19).toLowerCase))), r.get(r.fieldIndex(new String(cols(20).toLowerCase))), r.get(r.fieldIndex(new String(cols(21).toLowerCase))), r.get(r.fieldIndex(new String(cols(22).toLowerCase))))))
          })

          val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(spp)

          val cells = partitionedRDD.flatMap(r => {
            val salted_keys = r._1
            val colFamily = hbase_colfamily

            Seq(
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(0).getBytes(), Bytes.toBytes(Option(r._2(0)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(1).getBytes(), Bytes.toBytes(Option(r._2(1)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(2).getBytes(), Bytes.toBytes(Option(r._2(2)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(3).getBytes(), Bytes.toBytes(Option(r._2(3)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(4).getBytes(), Bytes.toBytes(Option(r._2(4)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(5).getBytes(), Bytes.toBytes(Option(r._2(5)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(6).getBytes(), Bytes.toBytes(Option(r._2(6)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(7).getBytes(), Bytes.toBytes(Option(r._2(7)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(8).getBytes(), Bytes.toBytes(Option(r._2(8)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(9).getBytes(), Bytes.toBytes(Option(r._2(9)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(10).getBytes(), Bytes.toBytes(Option(r._2(10)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(11).getBytes(), Bytes.toBytes(Option(r._2(11)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(12).getBytes(), Bytes.toBytes(Option(r._2(12)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(13).getBytes(), Bytes.toBytes(Option(r._2(13)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(14).getBytes(), Bytes.toBytes(Option(r._2(14)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(15).getBytes(), Bytes.toBytes(Option(r._2(15)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(16).getBytes(), Bytes.toBytes(Option(r._2(16)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(17).getBytes(), Bytes.toBytes(Option(r._2(17)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(18).getBytes(), Bytes.toBytes(Option(r._2(18)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(19).getBytes(), Bytes.toBytes(Option(r._2(19)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(20).getBytes(), Bytes.toBytes(Option(r._2(20)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(21).getBytes(), Bytes.toBytes(Option(r._2(21)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(22).getBytes(), Bytes.toBytes(Option(r._2(22)).getOrElse("").toString)))
            )
          })

          val job = Job.getInstance(hbaseconfig, "Insert Transaction Data Row " + (start_row + (page * page_limit.toInt)).toString + " to " + ((end_row + (page * page_limit.toInt)) - 1).toString + " for " + x.toString)
          HFileOutputFormat2.configureIncrementalLoad(job, table2, regionLoc2)

          val conf = job.getConfiguration

          if (fs.exists(path)) {
            fs.delete(path, true)

            cells.saveAsNewAPIHadoopFile(
              path.toString,
              classOf[ImmutableBytesWritable],
              classOf[KeyValue],
              classOf[HFileOutputFormat2],
              conf
            )
          }
          else if (!fs.exists(path)) {
            cells.saveAsNewAPIHadoopFile(
              path.toString,
              classOf[ImmutableBytesWritable],
              classOf[KeyValue],
              classOf[HFileOutputFormat2],
              conf
            )
          }

          val bulk_loader = new LoadIncrementalHFiles(conf)
          bulk_loader.doBulkLoad(path, admin2, table2, regionLoc2)

          conf.clear()
          println("Done For " + x.toString + " pages " + (start_row + (page * page_limit.toInt)).toString + " to " + ((end_row + (page * page_limit.toInt)) - 1).toString)

          if (fs.exists(bookmark)) {
            fs.delete(bookmark, true)
            Seq((date_format.format(x), page.toString)).map(r => (r._1, r._2)).toDF("Date", "Page").write.format("com.databricks.spark.csv").option("delimiter", "|").save(bookmark_path)
          }
          else {
            Seq((date_format.format(x), page.toString)).map(r => (r._1, r._2)).toDF("Date", "Page").write.format("com.databricks.spark.csv").option("delimiter", "|").save(bookmark_path)
          }
          0
        })
  }
  hbase_connection2.close()
  0
}

I am really at my wits end as I could not trace what is causing this error. I hope someone can give me some ideas on what could be the cause of this file splitting error.


Solution

  • I think you may be seeing this: https://issues.apache.org/jira/projects/HBASE/issues/HBASE-21183

    It is something I saw sporadically, so we never solved it. How regularly do you see it please?