Search code examples
pythonapache-sparkpysparktimerexit

Stop Spark Session after some time - Pyspark


I am doing an ETL in spark which sometimes takes a lot of time. I want to gracefully shutdown the spark session after a certain time.

I am writing my code in Pyspark.

try:
 df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
 spark.stop()

I would like to stop spark after sometime in the above code.

Is there a way to gracefully shutdown the spark session after sometime??


Solution

  • I would suggest use the official python Timer to stop the Spark session gracefully:

    import threading
    
    def timer_elapsed():
        print('Timer elapsed')
        if not sc._jsc.sc().isStopped():
          spark.stop()
    
    # wait for 0.5 sec for Spark job to complete
    spark_timer = threading.Timer(0.5, timer_elapsed)
    spark_timer.start()
    
    try:
      df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
      print('Spark job finished successfully.')
    except Exception as e:
      spark_timer.cancel() # stop timer, we don't need to wait if error occured
      if not sc._jsc.sc().isStopped():
        spark.stop()
    

    Note: We stop the session in two cases if time has elapsed or exception was caught. Before requesting to stop the Spark context we check if the context is active with sc._jsc.sc().isStopped which calls the Java API directly.