Search code examples
scalagoogle-cloud-dataflowapache-beamscalatestspotify-scio

Parameterized tests SCIO (JobTest) and Scala test (forAll)


I want to do parameterized tests with SCIO JobTest and Scala Test. I use TableDrivenPropertyChecks that allows, via a a forAll to do parameterized tests.

import org.scalatest.prop.TableDrivenPropertyChecks.{forAll => forAllParams, _}

 val jobArgs = Array(
        "--nullableCoders=true",
        "--inputSubscription=in",
        "--inputAvro=test",
        "--outputBq1=out-table-1",
        "--outputBq2=out-table-2"
      )
    }

val ioParams =
    Table(
      ("description", "inputRawPlusData", "expectedDigitalAvatars", "expectedDataRecords"),
      (
        "Desc1",
        getInputData1...,
        getExpectedOutput1...,
        getExpectedOutput2...
      ),
      (
        "Desc1",
        getInputData2...,
        getExpectedOutput...,
        getExpectedOutput...
      )
    )

forAllParams(ioParams) { (description: String,
                            inputData: Seq[String],
                            expectedOutput1: Seq[...],
                            expectedOutput2: Seq[...]) =>

    it should s"have $description..." in {
      JobTest[com.Job.type]
        .args(jobArgs: _ *)
        .input(PubsubIO[String]("in"), inputData)
        .input(AvroIO[GenericRecord]("test"), test)
        .output(BigQueryIO[Obj1]("out-table-1"))(result => shouldMatchExpectedOutput1(result, expectedOutput1))
        .output(BigQueryIO[Obj2]("out-table-2"))(result => shouldMatchExpectedOutput2(result, expectedOutput2))
        .run()
    }
  }

For the first group of parameters, the test work fine, but for the second, the Job args become empty. I don't understand why (maybe a state..).

When i execute all the group params separately (alone and without others), it works.

Why there is this behaviour ?

Is it possible to do parameterized tests with SCIO JobTest ?

Thanks in advance for your helps.


Solution

  • I finally reproduce the problem, in a small job.

    I used contextAndArgs and PipelineOptionFactory at the same time :

      val (sc, args) = ContextAndArgs(cmdlineArgs)
    
      val options = PipelineOptionsFactory
          .fromArgs(cmdlineArgs: _*)
          .withValidation
          .as(classOf[JobOptions])
    
        options.setStreaming(true)
    

    I removed the options part and this works. Thanks again :)