Search code examples
flaskcassandraatexitdatastax-astra

Unable to delete table from AstraDb when closing Flask app using atexit()


app = Flask(__name__)

# AstraDB connection and vector store initialization
def initialize_astra_vector_store():
    cassio.init(token=ASTRA_DB_APPLICATION_TOKEN, database_id=ASTR_DB_ID)
    
    embedding = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)
    astra_vector_store = Cassandra(
        embedding=embedding,
        table_name=table_name,
        session=None,
        keyspace=None
    )
    print('----------------------table created as table_name:', table_name)
    return astra_vector_store
.
.
.
.flask app routes----------------
.
.
def delete_table():
    print('----------------------inside delete_table')
    db = AstraDB(
        token=os.getenv('ASTRA_DB_APPLICATION_TOKEN'),
        api_endpoint='<api-endpoint>',
    )
    print('----------------------table_name:', table_name)      
    # Drop the table created for this session
    db.delete_collection(collection_name=table_name)
    print("----------------------APP EXITED----------------------")
atexit.register(delete_table)

if __name__ == '__main__':
    app.run(debug=False)

Now when i run the app i get the error as

Exception in thread Task Scheduler:
Traceback (most recent call last):
  File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "cassandra\cluster.py", line 4239, in cassandra.cluster._Scheduler.run
  File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Exception in thread Task Scheduler:
Traceback (most recent call last):
  File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "cassandra\cluster.py", line 4239, in cassandra.cluster._Scheduler.run
  File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

when I run the app and close it, I get the error, but I expected it to simply delete the astraDB table created during the app...although the table gets deleted when I run and then close the app....but getting this error.


Solution

  • First let me clarify that your code seems to somewhat mix two different approaches to Astra DB:

    • the CQL approach (as you might essentially run on any Cassandra cluster), i.e. cassio and langchain_community.vectorstores.Cassandra
    • the Data API approach, specific do DataStax Astra DB: that's what your shutdown hook seems to use, i.e. astrapy.db.AstraDB (that you use in the shutdown code) and langchain_astradb.AstraDBVectorStore (which you would use in the app were it completely based on this approach)

    The two approaches have each their pros and cons (such as, the Data API is certainly easier to use and more idiomatic if one is not familiar with Cassandra; conversely, the first one is applicable to any Cassandra 5+ cluster no matter its deployment).

    In practice, the script creates a vector store (hence the underlying table on database) in the first paradigm, which is then dropped using the Data API. This seems to work, but such mixing is hardly the intended usage. If you want to stick to the CassIO approach, I suggest to replace the shutdown part with code such as the following (cf. the relevant CassIO docs):

    def delete_table_cql():
        print(f"Deleting table {store}")
        # the following two lines expect cassio.init() has been run already
        _session = cassio.config.resolve_session()
        _keyspace = cassio.config.resolve_keyspace()
        # issue a CQL command to drop the table
        _session.execute(f"DROP TABLE IF EXISTS {_keyspace}.{table_name};")
    
    atexit.register(delete_table_cql)
    

    That being said, let's get to the main question, the "futures after shutdown" symptom.

    This issue is not specific to CassIO and originates in the Python Cassandra drivers (that CassIO internally uses; and the LangChain Cassandra store in turn uses CassIO). What happens is that the drivers make use of a specific executor to perform some asynchronous tasks (cleanup/internal maintenance during the lifetime of the Session), which is emphatically not the one used to run queries (not even if you ran asynchronous queries). The error you get comes from one such "hidden" drivers janitor's task being scheduled too late, i.e. when the application is shutting down and this particular executor is no more available. Explicit queries, such as the DROP TABLE command above, would 100% complete successfully despite this scary-looking "error". (whatever the resource cleanup the driver is doing internally, who cares? It's about to be destroyed anyway, right?).

    To be more precise: the script, as is, interestingly does not have anything related to the Cassandra driver in its shutdown hook (that is, as remarked, a pure Data API thing, i.e. based solely on HTTP requests). As a matter of fact, the shutdown hook you show is functionally equivalent to the following:

    def delete_table_bare_http():
        import requests
        req = requests.post(
            f"{ASTRA_DB_API_ENDPOINT}/api/json/v1/default_keyspace",
            headers={"token": ASTRA_DB_APPLICATION_TOKEN},
            json={"deleteCollection": {"name": table_name}},
        )
        assert req.status_code == 200
    
    atexit.register(delete_table_bare_http)
    

    ... but, once more, you should either adapt this hook to be CQL-based like the rest of the code, or consider switching the whole application to the Astra DB (Data API-based) LangChain support.

    In short, feel free to disregard this error completely: your requests are guaranteed to execute to their end.

    Source: I am the main author of CassIO + I have had a few conversations on this very issue with one of the lead maintainers of the Python Cassandra drivers.