Search code examples
scalaunit-testinghadoopapache-sparkhbase

Mocking HTable data for a unit test of a spark job


I have a Scala spark job that reads from HBase like so:

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val uniqueAttrs = calculateFreqLocation(hBaseRDD)

I am trying to write a unit test for the function calculateFreqLocation:

 def calculateFreqLocation(inputRDD: RDD[(ImmutableBytesWritable, Result)]): Map[String, Map[(String, String, String), Long]] =  {
    val valueType = classOf[Array[Attribute]]
    val family = "cf_attributes".getBytes()
    val qualifier = "attributes".getBytes()
    val rdd7 = inputRDD.map(kv => (getUUID(kv._1.get()).toString(),
      objectMapper.readValue(new String(kv._2.getValue(family, qualifier)), valueType))).flatMap(flattenRow).filter(t => location_attributes.contains(t._2))

    val countByUUID = rdd7.countByValue().groupBy(_._1._1)
    val countByUUIDandKey = countByUUID.map(kv => (kv._1, kv._2.groupBy(_._1._2)))
    val uniqueAttrs = countByUUIDandKey.map(uuidmap => (uuidmap._1,uuidmap._2.map(keymap => keymap._2.maxBy(_._2))))
    return uniqueAttrs
  }

This counts unique attributes for each UUID. My unit test tries to recreate the HTable data and then pass the RDD to the function to see if the output matches:

@RunWith(classOf[JUnitRunner])
class FrequentLocationTest extends SparkJobSpec {
    "Frequent Location calculation" should {

    def longToBytes(x: Long): Array[Byte] = {
      return ByteBuffer.allocate(java.lang.Long.SIZE / java.lang.Byte.SIZE).putLong(x).array
    }
    val currTimestamp = System.currentTimeMillis / 1000
    val UUID_1 = UUID.fromString("123456aa-8f07-4190-8c40-c7e78b91a646")
    val family = "cf_attributes".getBytes()
    val column = "attributes".getBytes()
    val row = "[{'name':'Current_Location_Ip_Address', 'value':'123.456.123.248'}]"

    val resultRow = Array(new KeyValue(row.getBytes(), family, column, null))

    val key = "851971aa-8f07-4190-8c40-c7e78b91a646".getBytes() ++ longToBytes(currTimestamp)
    val input = Seq((key,row))
    val correctOutput = Map(
      ("851971aa-8f07-4190-8c40-c7e78b91a646" -> Map(("123456aa-8f07-4190-8c40-c7e78b91a646","Current_Location_Ip_Address","123.456.123.248") -> 1))
      )

    "case 1 : return with correct output (frequent location calculation)" in {
      val inputRDD = sc.makeRDD(input, 1)
      val hadoonRdd = new HadoopRDD(sc, sc.broadcast(new SerializableWritable(new Configuration()))
        .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], null, classOf[InputFormat[ImmutableBytesWritable,Result]], classOf[ImmutableBytesWritable],classOf[Result],1)

      val finalInputRdd = hadoonRdd.union(inputRDD.map(kv => ( new ImmutableBytesWritable(kv._1), new Result(Array(new KeyValue(kv._2.getBytes(), family, column, null))))))

      val resultMap = FrequentLocation.calculateFreqLocation(finalInputRdd)
      resultMap == correctOutput
      //val customCorr = new FrequentLocation().calculateFreqLocation(inputRDD)
      //freqLocationMap must_== correctOutput
    }
  }
}

What I get is org.apache.spark.SparkException: Task not serializable. I've come to understand it is because of the LongByteWritable and other HTable classes that spark cant serialize between nodes. The code I provided I am actually getting into developer Spark apis (creating the HadoopRDD manually) but dont have any way to actually populate this with data. How can I test this? I need to return an instance of a HadoopRDD with data in it to this function. Or an instance of RDD(ImmutableBytesWritable, Result). I was initially creating this RDD manually, same error. then I switched to using map and mapping it from raw binary/text. Any help would be appreciated!


Solution

  • Answering with my own findings to provide some guidance to anyone else also stuck with similar stack: spark running over HBase.

    If you followed most tutorials for unit-testing Spark procedures you probably have a class like this one:

    abstract class SparkJobSpec extends SpecificationWithJUnit with BeforeAfterExample {  
    
     @transient var sc: SparkContext = _
    
      def beforeAll = {
        System.clearProperty("spark.driver.port")
        System.clearProperty("spark.hostPort")
    
        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("test")
          //this kryo stuff is of utter importance
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
          //.setJars(Seq(System.getenv("JARS")))
        sc = new SparkContext(conf)
      }
    
      def afterAll = {
        if (sc != null) {
          sc.stop()
          sc = null
          System.clearProperty("spark.driver.port")
          System.clearProperty("spark.hostPort")
        }
      }
    
      def before = {}
    
      def after = {}
    
      override def map(fs: => Fragments) = Step(beforeAll) ^ super.map(fs) ^ Step(afterAll)
    
    }
    

    The solution to the question I posted was actually 2 part:

    • the Task not serializable exception is fixed easily by dropping a with Serializable (posted below) to your unit-test suite class as well as your original Spark procedure. Apparently passing RDDs between classes actually serializes the entire class or something? I don't know but it helped.

    • The Biggest problem I was running into was that sparkcontext.newAPIHadoopRDD() method is very nice but returns a very strange RDD of the form RDD(ImmutableBytesWritable, Result). Neither is Serializable and it will really complain about this when you call functions from your Spark job with this self-constructed RDD. The key here is the: .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable])) set on your sparkConf. I didn't need to do this in the original spark procedure for some reason. Not sure if this is because of something spark was doing on its own in my qa cluster or perhaps I was never passing this RDD outside the procedure so it never had to be serialized.

    The final code for the unit test looks pretty straight forward actually!

    @RunWith(classOf[JUnitRunner])
    class FrequentLocationTest extends SparkJobSpec with Serializable {
    
    "Frequent Location calculation" should {
        //some UUID generator stuff here 
        val resultRow = Array(new KeyValue(Bytes.add(longToBytes(UUID_1.getMostSignificantBits()), longToBytes(UUID_1.getLeastSignificantBits())), family, column, row.getBytes()))
        val input = Seq((new ImmutableBytesWritable(key), new Result(resultRow)))
        val correctOutput = Map(
          ("851971aa-8f07-4190-8c40-c7e78b91a646" -> Map(("851971aa-8f07-4190-8c40-c7e78b91a646","Current_Location_Ip_Address","123.456.234.456") -> 1))
          )
    
        "case 1 : return with correct output (frequent location calculation)" in {
          val inputRDD = sc.makeRDD(input, 1)
          val resultMap = FrequentLocation.calculateFreqLocation(inputRDD)
          resultMap == correctOutput
         }
    
        }
      }