Search code examples
pythondjangocelerycelery-task

Celery disallowing the usage of dictionary data type


I have defined a shared_task with celery as follows:

@shared_task(bind=True)
def create_parameters_for_task_creation(project_type, dataset_instance_ids, filter_string, sampling_mode, sampling_parameters, variable_parameters, project_id) -> None: 
    """Function to create the paramters for the task creation process. The function is passed arguments from the frontend which decide how the sentences have to be filtered and sampled. 

    Args:
        project_type (str): Describes the type of project passed by the user
        dataset_instance_ids (int): ID of the dataset that has been provided for the annotation task 
        filter_string (str): _description_
        sampling_mode (str): Method of sampling
        sampling_parameters (dict): Parameters for sampling
        variable_parameters (dict): _description_
        project_id (int): ID of the project object created in this iteration

    """

As soon as I try to run celery workers - celery -A shoonya_backend.celery worker -l info, I get the following errors.

Unrecoverable error: TypeError("cannot pickle 'dict_keys' object")

I believe that Celery is not allowing me to pass the dictionary data type which is weird because my settings allow json datatype.

CELERY_BROKER_URL = 'redis://localhost:6380'
result_backend = 'redis://localhost:6380'
accept_content = 'json'
result_serializer = 'json'
task_serializer = 'json'

This is what my celery.py file looks like.

from __future__ import absolute_import, unicode_literals

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shoonya_backend.settings')

# Define celery app and settings
celery_app = Celery('shoonya_backend',
                    result_backend = 'django-db',
                    accept_content = ['application/json'],
                    result_serializer = 'json',
                    task_serializer = 'json')

celery_app.config_from_object(settings, namespace='CELERY')
celery_app.autodiscover_tasks()

@celery_app.task(bind=True)
def debug_task(self):
    ''' First task for task handling testing and to apply migrations to the celery results db '''
    print(f'Request: {self.request!r}') 

What should be done?


Solution

  • This is how I solved it by changing the celery.py file.

    from __future__ import absolute_import, unicode_literals
    
    import os
    from celery import Celery
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shoonya_backend.settings')
    
    # Define celery app and settings
    celery_app = Celery('shoonya_backend',
                        result_backend = 'django-db',
                        accept_content = ['application/json'],
                        result_serializer = 'json',
                        task_serializer = 'json')
    
    celery_app.config_from_object('django.conf:settings', namespace='CELERY')
    celery_app.autodiscover_tasks()
    
    @celery_app.task(bind=True)
    def debug_task(self):
        ''' First task for task handling testing and to apply migrations to the celery results db '''
        print(f'Request: {self.request!r}')