Search code examples
pythondatabricksfastapiazure-databricksdatabricks-unity-catalog

Create tables in Databricks using FastAPI - Python code


I am developing a FastApi which suppose to do some calculations based on a request in JSON format and then sends the response and stores it in several Databricks catalog tables.

So, in the API, I convert the response and also create the tables

What I am struggling with is what would be the correct databricks API endpoint that I should connect to?

As you can see from the code below, I defined:

url = f"{self.databricks_host}/api/2.0/sql/createTable"

but it is not working.

def send_to_dtb_catalog(self, df, table_name):
        # doing some stuff here ....

        # Prepare data payload for Databricks API
        data = {
            "tableName": f"my_database.my_schema.{table_name}",
            "data": df_json
        }   

        # Make HTTP request to Databricks REST API
        # suppose databricks_host and databricks_token are pre-defined 
        url = f"{self.databricks_host}/api/2.0/sql/createTable"

        headers = {
            "Authorization": f"Bearer {self.databricks_token}",
            "Content-Type": "application/json"
        }

        response = requests.post(url, headers=headers, json=data)

Then I will use send_to_dtb_catalog to send the created tables to Databricks catalog tables, something like this

self.send_to_dtb_catalog(table1_df, "table1_databricks")
self.send_to_dtb_catalog(table2_df, "table2_databricks")

I appreciate any help as I am new to both Databricks and API development.


Solution

  • You can use the following API to execute SQL statements.

    Execute a SQL statement

    Alter your function like below.

    Code:

    import requests
    
    def send_to_dtb_catalog(df, table_name):
        url = f"{databricks_host}/api/2.0/sql/statements/"
    
        headers = {
            "Authorization": f"Bearer {databricks_token}",
            "Content-Type": "application/json"
        }
        sql_q = f'''
            CREATE TABLE IF NOT EXISTS {table_name} (
            id INT,
            name STRING
            )
        '''
    
        body = {
            "warehouse_id": "a415c87c62c279a5",
            "statement": sql_q,
            "wait_timeout": "30s",
            "on_wait_timeout": "CANCEL"
        }
        response = requests.post(url, headers=headers, json=body)
        if response.json()['status']['state'] == 'SUCCEEDED':
            print("Inserting values....")
        
            t = df.rdd.map(lambda row: tuple(row)).collect()
            insert_query = f'''
            INSERT INTO {table_name}
            VALUES
            {','.join(map(str, t))}
            '''
        
            body['statement'] = insert_query
    
            res2 = requests.post(url, headers=headers, json=body)
        return res2
    

    Next, call your function.

    Output:

    enter image description here

    Output of API request:

    enter image description here

    One more way is using drivers to connect to Databricks.

    Refer this on how to connect to the server and execute queries.