Search code examples
scalaapache-sparktestcontainersspark-kafka-integration

testing kafka and spark with testcontainers


I am trying to check with testcontainers a streaming pipeline as a integration test but I don´t know how get bootstrapServers, at least in last testcontainers version and create a specific topic there. How can I use 'containerDef' to extract bootstrapservers and add a topic?

import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession

class Mykafkatest extends FunSuite with TestContainerForAll {
  //val kafkaContainer: KafkaContainer      = KafkaContainer("confluentinc/cp-kafka:5.4.3")
  override val containerDef: ContainerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    val sparkSession: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Unit testing")
      .getOrCreate()

    // How add a topic in that container?

    // This is not posible:
    val servers=container.bootstrapServers

    val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("subscribe", "topic1")
      .load()


    df.show(false)

  })

}

My sbt configuration:

lazy val root = project
  .in(file("./pipeline"))
  .settings(
    organization := "org.example",
    name := "spark-stream",
    version := "0.1",
    scalaVersion := "2.12.10",
    libraryDependencies := Seq(
      "org.apache.spark" %% "spark-sql-kafka-0-10"       % "3.0.3"  % Compile,
      "org.apache.spark" %% "spark-sql"                  % "3.0.3"  % Compile,
      "com.dimafeng"     %% "testcontainers-scala-munit" % "0.39.5" % Test,
      "org.dimafeng"     %% "testcontainers-scala-kafka" % "0.39.5" % Test,
      "org.scalameta"    %% "munit"                      % "0.7.28" % Test
    ),
    testFrameworks += new TestFramework("munit.Framework"),
    Test / fork := true
  )

Documentation does not show a complete example: https://www.testcontainers.org/modules/kafka/


Solution

  • The only problem here is that you are explicitly casting that KafkaContainer.Def to ContainerDef.

    The type of container provided by withContianers, Containter is decided by path dependent type in provided ContainerDef,

    trait TestContainerForAll extends TestContainersForAll { self: Suite =>
    
      val containerDef: ContainerDef
    
      final override type Containers = containerDef.Container
    
      override def startContainers(): containerDef.Container = {
        containerDef.start()
      }
    
      // inherited from TestContainersSuite
      def withContainers[A](runTest: Containers => A): A = {
        val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
        runTest(c)
      }
    
    }
    
    trait ContainerDef {
    
      type Container <: Startable with Stoppable
    
      protected def createContainer(): Container
    
      def start(): Container = {
        val container = createContainer()
        container.start()
        container
      }
    }
    

    The moment you explicitly specify the type ContainerDef in override val containerDef: ContainerDef = KafkaContainer.Def(), this breaks the whole "type trickery" and thus Scala compiler is left with a type Container <: Startable with Stoppable instead of a KafkaContainer.

    So, just remove that explicit type cast to ContainerDef, and that val servers = container.bootstrapServers will work as expected.

    import com.dimafeng.testcontainers.KafkaContainer
    import com.dimafeng.testcontainers.munit.TestContainerForAll
    import munit.FunSuite
    
    class Mykafkatest extends FunSuite with TestContainerForAll {
      override val containerDef = KafkaContainer.Def()
    
      test("do something")(withContainers { container =>
        //...
    
        val servers = container.bootstrapServers
    
        println(servers)
    
        //...
      })
    }