Search code examples
pythonjsondatabricksdatabricks-clidatabricks-rest-api

"Invalid JSON given in the body of the request - expected a map" when using reset_job method


I am trying to change an existing job settings using the cli but when I invoke the reset_job method I am getting this error:

Traceback (most recent call last):
  File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 78, in <module>
    dr.experiment(host,token)
  File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 58, in experiment
    jobs.reset_job(job_json)
  File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/jobs/api.py", line 49, in reset_job
    return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers,
  File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/sdk/api_client.py", line 174, in perform_query
    raise requests.exceptions.HTTPError(message, response=e.response)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://spg-sustainable1-qa.cloud.databricks.com/api/2.0/jobs/reset
 Response from server: 
 { 'error_code': 'MALFORMED_REQUEST',
  'message': 'Invalid JSON given in the body of the request - expected a map'}

Here is the sample python code I am using:

...
api_client = ApiClient(host=databricks_host, token=databricks_token)
jobs = JobsApi(api_client)

job_list = jobs.list_jobs()["jobs"]

job_name = "DP DataSync Job"
result_list = list(
    filter(
    lambda job: job['settings']['name'] == job_name, job_list)
    )

job = result_list[0]
job_id = job["job_id"]
job["settings"]["schedule"]["pause_status"] = "UNPAUSED"

print(f"Resetting job with id: {job_id}")

job_json = json.dumps(job)

jobs.reset_job(job_json)

Here is the json that gets passed to reset_job:

{
    "job_id": 217841321277199,
    "creator_user_name": "...",
    "settings": {
        "name": "DP DataSync Job",
        "new_cluster": {
            "cluster_name": "",
            "spark_version": "10.4.x-scala2.12",
            "aws_attributes": {
                "first_on_demand": 1,
                "availability": "SPOT_WITH_FALLBACK",
                "zone_id": "us-east-1a",
                "spot_bid_price_percent": 100,
                "ebs_volume_count": 0
            },
            "node_type_id": "d3.4xlarge",
            "custom_tags": {
                "Owner": "[email protected]",
                "AppID": "appidhere",
                "Environment": ""
            },
            "spark_env_vars": {
                "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
            },
            "enable_elastic_disk": false,
            "runtime_engine": "STANDARD",
            "autoscale": {
                "min_workers": 2,
                "max_workers": 16
            }
        },
        "libraries": [
            {
                "jar": "DataSync-1.0-all.jar"
            }
        ],
        "email_notifications": {
            "on_start": [
                "[email protected]"
            ],
            "on_success": [
                "[email protected]"
            ],
            "on_failure": [
                "[email protected]"
            ],
            "no_alert_for_skipped_runs": false
        },
        "timeout_seconds": 0,
        "schedule": {
            "quartz_cron_expression": "35 0 21 * * ?",
            "timezone_id": "America/New_York",
            "pause_status": "UNPAUSED"
        },
        "spark_jar_task": {
            "jar_uri": "",
            "main_class_name": "com.company.s.dp.datasync",
            "parameters": [
                "Config.json"
            ],
            "run_as_repl": true
        },
        "max_concurrent_runs": 1,
        "format": "SINGLE_TASK"
    },
    "created_time": 1678272261985
}

Databricks CLI version: 17.4


Solution

  • The payload that you're using is only for the Job Get response - you can't use it as-is for resetting the job. If you look into the Job Reset API, you will see that the payload consists only of two fields:

    • job_id - ID of the job to reset
    • new_settings - settings to set for the job, while you use the settings.
    {
        "job_id": 11223344,
        "new_settings": {
        "name": "A multitask job",
    ...
        }
    }
    

    You also don't need to do json.dumps yourself - it will be done by the API client (see source code).

    So your code should be modified to the following:

    orig_job = result_list[0]
    job_id = job["job_id"]
    job = {"job_id": job_id, "new_settings": job["settings"]}
    job["new_settings"]["schedule"]["pause_status"] = "UNPAUSED"
    
    jobs.reset_job(job)