Search code examples
dataframescalaazure-databricks

Using readstream against kafka in databricks, how can I tell the stream to gracefully self-terminate after a certain time?


I'm using Azure Databricks to run scala notebooks that read from several kafka topics. Right now they run continuously, but I'd like them to wrap up after 10 minutes and allow the scheduler to restart the job later.

(This is for cost reasons; I don't want the cluster running continuously, and I don't mind the messages to accumulate in the topic until the next run.)

Here's what it looks like now:

Commands Content
cmd 1 val df1 = spark.readStream().subscribe("topic1"). ... .load()
cmd 2 df1.writeStream.outputMode("append"). ... .start()
cmd 3 val df2 = spark.readStream().subscribe("topic2"). ... .load()
cmd 4 // Do similar with df2

= = =

Question then is, how can I tell tell df1 and df2 to finish up their current batch after 10 minutes and gracefully shut down the notebook?

I'm new to databricks, so started with reading the options available when creating the stream: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

Seems none of them say "just throw in the towel after X seconds". I can't use kafkaConsumer.pollTimeoutMs and fetchOffset.numRetries options, because there is a frequent trickle of records coming in.


Solution

  • You use awaitTermination to stop the stream after specified time. You can use below code for your stream.

    val  query1 = df.withColumn("key", decode($"key","UTF-8"))
    .withColumn("result", decode($"value","UTF-8"))
    .select("key", "result", "topic").writeStream.format("console").start()
    
    query1.awaitTermination(5 * 60 * 1000)
    query1.stop()
    

    This will stop the stream after 5 minutes.

    enter image description here