I am trying to change an existing job settings using the cli but when I invoke the reset_job
method I am getting this error:
Traceback (most recent call last):
File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 78, in <module>
dr.experiment(host,token)
File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 58, in experiment
jobs.reset_job(job_json)
File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/jobs/api.py", line 49, in reset_job
return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers,
File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/sdk/api_client.py", line 174, in perform_query
raise requests.exceptions.HTTPError(message, response=e.response)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://spg-sustainable1-qa.cloud.databricks.com/api/2.0/jobs/reset
Response from server:
{ 'error_code': 'MALFORMED_REQUEST',
'message': 'Invalid JSON given in the body of the request - expected a map'}
Here is the sample python code I am using:
...
api_client = ApiClient(host=databricks_host, token=databricks_token)
jobs = JobsApi(api_client)
job_list = jobs.list_jobs()["jobs"]
job_name = "DP DataSync Job"
result_list = list(
filter(
lambda job: job['settings']['name'] == job_name, job_list)
)
job = result_list[0]
job_id = job["job_id"]
job["settings"]["schedule"]["pause_status"] = "UNPAUSED"
print(f"Resetting job with id: {job_id}")
job_json = json.dumps(job)
jobs.reset_job(job_json)
Here is the json that gets passed to reset_job
:
{
"job_id": 217841321277199,
"creator_user_name": "...",
"settings": {
"name": "DP DataSync Job",
"new_cluster": {
"cluster_name": "",
"spark_version": "10.4.x-scala2.12",
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-east-1a",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "d3.4xlarge",
"custom_tags": {
"Owner": "[email protected]",
"AppID": "appidhere",
"Environment": ""
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"enable_elastic_disk": false,
"runtime_engine": "STANDARD",
"autoscale": {
"min_workers": 2,
"max_workers": 16
}
},
"libraries": [
{
"jar": "DataSync-1.0-all.jar"
}
],
"email_notifications": {
"on_start": [
"[email protected]"
],
"on_success": [
"[email protected]"
],
"on_failure": [
"[email protected]"
],
"no_alert_for_skipped_runs": false
},
"timeout_seconds": 0,
"schedule": {
"quartz_cron_expression": "35 0 21 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"spark_jar_task": {
"jar_uri": "",
"main_class_name": "com.company.s.dp.datasync",
"parameters": [
"Config.json"
],
"run_as_repl": true
},
"max_concurrent_runs": 1,
"format": "SINGLE_TASK"
},
"created_time": 1678272261985
}
Databricks CLI version: 17.4
The payload that you're using is only for the Job Get response - you can't use it as-is for resetting the job. If you look into the Job Reset API, you will see that the payload consists only of two fields:
job_id
- ID of the job to resetnew_settings
- settings to set for the job, while you use the settings
.{
"job_id": 11223344,
"new_settings": {
"name": "A multitask job",
...
}
}
You also don't need to do json.dumps
yourself - it will be done by the API client (see source code).
So your code should be modified to the following:
orig_job = result_list[0]
job_id = job["job_id"]
job = {"job_id": job_id, "new_settings": job["settings"]}
job["new_settings"]["schedule"]["pause_status"] = "UNPAUSED"
jobs.reset_job(job)