from google.cloud import tasks_v2
import json
GCP_PROJECT='test'
GCP_LOCATION='europe-west6'
def enqueue_task(queue_name, payload, process_url):
client = tasks_v2.CloudTasksClient()
parent = client.queue_path(GCP_PROJECT, GCP_LOCATION, queue_name)
task = {
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': process_url
}
}
if payload is None:
return False
payload = json.dumps(payload)
converted_payload = payload.encode()
task['app_engine_http_request']['body'] = converted_payload
return client.create_task(parent, task)
I keep getting the following error when I try to create a Google Cloud Task. The Google App Engine Runtime used is python38. It was working fine but suddenly after deployment using gcp CLI it does not work now.
Traceback (most recent call last):
File "/layers/google.python.pip/pip/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/layers/google.python.pip/pip/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/layers/google.python.pip/pip/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/layers/google.python.pip/pip/flask/_compat.py", line 39, in reraise
raise value
File "/layers/google.python.pip/pip/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/layers/google.python.pip/pip/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/srv/main.py", line 153, in sync_request
queue_response = queues.enqueue_task(
File "/srv/queues.py", line 28, in enqueue_task
return client.create_task(parent, task)
TypeError: create_task() takes from 1 to 2 positional arguments but 3 were given
The same thing has just happened to me (on live server :/). Turns out, the Cloud Tasks library has changed significantly with version 2.0.0. You can read what you need to do to upgrade here.
Your problem is in this line:
client.create_task(parent, task)
The updated library requires to either use a dictionary as a positional argument or use keyword arguments. So this should fix it:
client.create_task(parent=parent, task=task)
EDIT: Looking at your code now that I've made this work for myself, you will also have to change the following:
# Before
parent = client.queue_path(GCP_PROJECT, GCP_LOCATION, queue_name)
# After
parent = client.queue_path(project=GCP_PROJECT, location=GCP_LOCATION, queue=queue_name)
and
# Before
'http_method': 'POST',
# After
'http_method': tasks_v2.HttpMethod.POST,