I have defined a shared_task with celery as follows:
@shared_task(bind=True)
def create_parameters_for_task_creation(project_type, dataset_instance_ids, filter_string, sampling_mode, sampling_parameters, variable_parameters, project_id) -> None:
"""Function to create the paramters for the task creation process. The function is passed arguments from the frontend which decide how the sentences have to be filtered and sampled.
Args:
project_type (str): Describes the type of project passed by the user
dataset_instance_ids (int): ID of the dataset that has been provided for the annotation task
filter_string (str): _description_
sampling_mode (str): Method of sampling
sampling_parameters (dict): Parameters for sampling
variable_parameters (dict): _description_
project_id (int): ID of the project object created in this iteration
"""
As soon as I try to run celery workers - celery -A shoonya_backend.celery worker -l info
, I get the following errors.
Unrecoverable error: TypeError("cannot pickle 'dict_keys' object")
I believe that Celery is not allowing me to pass the dictionary data type which is weird because my settings allow json datatype.
CELERY_BROKER_URL = 'redis://localhost:6380'
result_backend = 'redis://localhost:6380'
accept_content = 'json'
result_serializer = 'json'
task_serializer = 'json'
This is what my celery.py
file looks like.
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shoonya_backend.settings')
# Define celery app and settings
celery_app = Celery('shoonya_backend',
result_backend = 'django-db',
accept_content = ['application/json'],
result_serializer = 'json',
task_serializer = 'json')
celery_app.config_from_object(settings, namespace='CELERY')
celery_app.autodiscover_tasks()
@celery_app.task(bind=True)
def debug_task(self):
''' First task for task handling testing and to apply migrations to the celery results db '''
print(f'Request: {self.request!r}')
What should be done?
This is how I solved it by changing the celery.py
file.
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shoonya_backend.settings')
# Define celery app and settings
celery_app = Celery('shoonya_backend',
result_backend = 'django-db',
accept_content = ['application/json'],
result_serializer = 'json',
task_serializer = 'json')
celery_app.config_from_object('django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()
@celery_app.task(bind=True)
def debug_task(self):
''' First task for task handling testing and to apply migrations to the celery results db '''
print(f'Request: {self.request!r}')