Search code examples
apache-sparkapache-kafkaspark-streamingspark-structured-streaming

Is there a way to dynamically stop Spark Structured Streaming?


In my scenario I have several dataSet that comes every now and then that i need to ingest in our platform. The ingestion processes involves several transformation steps. One of them being Spark. In particular I use spark structured streaming so far. The infrastructure also involve kafka from which spark structured streaming reads data.

I wonder if there is a way to detect when there is nothing else to consume from a topic for a while to decide to stop the job. That is i want to run it for the time it takes to consume that specific dataset and then stop it. For specific reasons we decided not to use the batch version of spark.

Hence is there any timeout or something that can be used to detect that there is no more data coming it and that everything has be processed.

Thank you


Solution

  • You can probably use this:-

    def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
        while (query.isActive) {
          try{
            if(query.lastProgress.numInputRows < 10){
              query.awaitTermination(1000)
            }
          }
          catch
          {
            case e:NullPointerException => println("First Batch")
          }
          Thread.sleep(500)
        }
      }
    

    You can make a numInputRows variable.