Search code examples
scalaapache-sparkapache-spark-sqlhdfsscalatest

MiniDFS cluster setup for multiple test classes throws java.net.BindException: Address already in use


I am writing unit test cases for spark code that reads/writes data from/to both hdfs files and spark's catalog. For this I created a separate trait that provides initialisation of minidfs cluster and I am using the generated hdfs uri in value for - spark.sql.warehouse.dir while creating the SparkSession object. Here is the code for it -

trait TestSparkSession extends BeforeAndAfterAll {
  self: Suite =>

  var hdfsCluster: MiniDFSCluster = _

  def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"

  def withLocalSparkSession(tests: SparkSession => Any): Any = {
    val baseDir = new File(PathUtils.getTestDir(getClass), "miniHDFS")
    val conf = new HdfsConfiguration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
    val builder = new MiniDFSCluster.Builder(conf)
    hdfsCluster = builder.nameNodePort(9000)
      .manageNameDfsDirs(true)
      .manageDataDfsDirs(true)
      .format(true)
      .build()
    hdfsCluster.waitClusterUp()

    val testSpark = SparkSession
      .builder()
      .master("local")
      .appName("Test App")
      .config("spark.sql.warehouse.dir", s"${nameNodeURI}spark-warehouse/")
      .getOrCreate()
    tests(testSpark)  
  }

  def stopHdfs(): Unit = hdfsCluster.shutdown(true, true)

  override def afterAll(): Unit = stopHdfs()

}

While writing my tests, I inherit this trait and then write test cases like -

class SampleSpec extends FunSuite with TestSparkSession {
     withLocalSparkSession {
         testSpark =>
         import testSpark.implicits._

         // Test 1 Here
         // Test 2 Here
     }
}

Everything works fine when I run my test classes one at a time. But when run them all at once it throws java.net.BindException: Address already in use. It should mean that the already created hdfsCluster is not yet down when the next set of tests are executed. That is why it is unable to create another one that binds to the same port. But then in the afterAll() I stopped the hfdsCluster.

My question is can I share single instance of hdfs cluster and spark session instead of initialising it every time ? I have tried to extract out the initialisation outside of the method but it still throwing same exception. Even if I can't share it, how can I properly stop my cluster and re-initialise it on next test class execution ?

Also, please let me know if my approach for writing 'unit' test cases that uses SparkSession and HDFS storage is correct.

Any help will be greatly appreciated.


Solution

  • I resolved it by creating the hdfs cluster in companion object instead so that it creates a single instance of it for all the test suits.