Search code examples
python-3.xscaladatabricksjobsaws-databricks

Run databricks job from notebook


I want to know if it is possible to run a Databricks job from a notebook using code, and how to do it

I have a job with multiple tasks, and many contributors, and we have a job created to execute it all, now we want to run the job from a notebook to test new features without creating a new task in the job, also for running the job multiple times in a loop, for example:

for i in [1,2,3]:
    run job with parameter i

Regards


Solution

  • what you need to do is the following:

    1. install the databricksapi. %pip install databricksapi==1.8.1

    2. Create your job and return an output. You can do that by exiting the notebooks like that:

      import json from databricksapi import Workspace, Jobs, DBFS dbutils.notebook.exit(json.dumps({"result": f"{_result}"}))

    If you want to pass a dataframe, you have to pass them as json dump too, there is some official documentation about that from databricks. check it out.

    1. Get the job id you will need it later. You can get it from the jobs details in databricks.

    2. In the executors notebook you can use the following code.

       def run_ks_job_and_return_output(params):
         context = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())
           # context
         url = context['extraContext']['api_url']
         token = context['extraContext']['api_token']
      
         jobs_instance = Jobs.Jobs(url, token) # initialize a jobs_instance
         runs_job_id = jobs_instance.runJob(****************, 'notebook',
                            params) # **** is the job id
      
         run_is_not_completed = True
         while run_is_not_completed:
           current_run = [run for run in jobs_instance.runsList('completed')['runs'] if run['run_id'] == runs_job_id['run_id'] and run['number_in_job'] == runs_job_id['number_in_job']]
           if len(current_run) == 0:
             time.sleep(30)
           else:
             run_is_not_completed = False
             current_run = current_run[0]
             print( f"Result state:   {current_run['state']['result_state']}, You can check the resulted output in the following link: {current_run['run_page_url']}")
             note_output = jobs_instance.runsGetOutput(runs_job_id['run_id'])['notebook_output']
             return note_output
      
       run_ks_job_and_return_output( { 'parm1' : 'george',
                                          'variable': "values1"})
      

    If you want to run the job many times in parallel you can do the following. (first be sure that you have increased the max concurent runs in the job settings)

    from multiprocessing.pool import ThreadPool 
    pool = ThreadPool(1000) 
    results = pool.map(lambda j: run_ks_job_and_return_output( { 'table' : 'george',
                                       'variable': "values1",
                                             'j': j}), 
             [str(x) for x in range(2,len(snapshots_list))])
    

    There is also the possibility to save the whole html output but maybe you are not interested on that. In any case I will answer to that to another post on StackOverflow.

    Hope it helps.