Search code examples
apache-sparkpyspark

"Cannot run multiple SparkContexts at once; existing SparkContext(app=dataHudi, master=local[*]) created by getOrCreate at


when trying to create multiple spark sessions (because of multiple api hits): I am getting error "Cannot run multiple SparkContexts at once; existing SparkContext(app=dataXXXX, master=local[*]) created by getOrCreate at".

So can someone please help me to understand why it is happening? how to handle multiple requests simultaneously

I tried the below code but this is not solving the issue

            flag = False
            spark=None
            while not flag:
                sparksessioncreator_object = NewSparkSession()
                spark = sparksessioncreator_object.Create_Session()
                if (spark.getActiveSession()):
                    print('ActiveSession yes')
                    flag = True
                else:
                    try:
                        spark.stop()
                    except:
                        pass
                    finally:
                        print('ActiveSession no')

Also, This is for your reference.

class NewSparkSession:

#..................................,..........................................................................................

def __init__(self):
    print("Creating Spark Session ------------------------>")

#..................................,..........................................................................................

def Create_Session(self):
    
    
    spark = SparkSession.builder \
            .master("local[*]") \
            .appName("dataHudi") \
            .config('spark.driver.bindAddress', '0.0.0.0') \
            .config('spark.driver.host', 'localhost') \
            .config('spark.jars.packages', 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1') \
            .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
            .getOrCreate()
    
    
    return spark

Solution

  • The problem with Spark is that it is designed to a single SparkContext per JVM. A SparkContext is the basic connection that is established to work with the spark objects in the cluster. The SparkSession object is the entry point to the application and ideally it can be used to share the same configuration across multiple applications.

    However, I have made changes to the class such that you can keep actively creating new sessions once a base session object is created.

    class NewSparkSession:
        def __init__(self, base_session):
            self.base_session = base_session
            print("Creating New Spark Session ------------------------>")
    
        def create_session(self):
            # Create a new session with isolated SQL configurations
            spark_session = self.base_session.newSession()
    
            return spark_session
    

    This class NewSparkSession now has a __init__ method which is used to set the base_session variable which will be used to derive new sessions as the loop construct progresses in your calling method.

    In the calling method, you can add this change:

    import pyspark.sql
    
    
    # Create a base SparkSession
    # Set configurations for the session which will be common throughout the execution
    base_session = (pyspark.sql.SparkSession.builder
                    .master("local[*]")
                    .appName("dataHudi")
                    .config('spark.driver.bindAddress', '0.0.0.0')
                    .config('spark.driver.host', 'localhost')
                    .config('spark.jars.packages', 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1')
                    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
                    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
                    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
                    .getOrCreate())
    
    flag = False
    new_spark_session = None
    while not flag:
        spark_session_creator_object = NewSparkSession(base_session)
        new_spark_session = spark_session_creator_object.create_session()
        if new_spark_session:
            print("New Spark Session is active")
            # Create any new spark config elements as it will be needed for each iteration of the while loop
            new_spark_session.conf.set("key", "value") # etc..
            # Use the new_spark_session to drive some logic
            print(new_spark_session) # New Spark Session derived from base session object
            flag = True
        else:
            try:
                new_spark_session.stop()
            except Exception as e:
                print(f"Error: {e}\n")
            finally:
                print("No Spark Sessions are active now")
    

    I have added a base_session object which can be used as a basis to create multiple spark sessions and drive the process per session and stop once you have addressed the API calls made.

    Not sure of the need of if..else block you have used, but instead of just checking on the state of the new_spark_session object, you can add an iterator variable to drive until there are API hits and stop if not.