Search code examples
pythondjangocelerydjango-celery

Celery-Progress - progress_recorder.set_progress() returns TypeError: sequence item 1: expected a bytes-like object, NoneType found


I'm inside a Django Project trying to use Celery-Progress to create a progress bar.

But when I try to use progress_recorder.set_progress method inside the following function (using a celery worker):

def increase_task_progress():
    global progress_recorder_current, progress_recorder_total, progress_recorder
    progress_recorder_current += 1
    progress_recorder.set_progress(progress_recorder_current, progress_recorder_total) <<<

It's giving the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/projects/report/excel.py", line 193, in generate_data_sheets
    increase_task_progress()
  File "/projects/report/excel.py", line 199, in increase_task_progress
    progress_recorder.set_progress(progress_recorder_current, progress_recorder_total)
  File "/projects/report/my_venv/lib/python3.7/site-packages/celery_progress/backend.py", line 38, in set_progress
    'percent': percent,
  File "/projects/report/my_venv/lib/python3.7/site-packages/celery/app/task.py", line 930, in update_state
    self.backend.store_result(task_id, meta, state, **kwargs)
  File "/projects/report/my_venv/lib/python3.7/site-packages/celery/backends/base.py", line 342, in store_result
    request=request, **kwargs)
  File "/projects/report/my_venv/lib/python3.7/site-packages/celery/backends/base.py", line 714, in _store_result
    self.set(self.get_key_for_task(task_id), self.encode(meta))
  File "/projects/report/my_venv/lib/python3.7/site-packages/celery/backends/base.py", line 590, in get_key_for_task
    self.task_keyprefix, key_t(task_id), key_t(key),
TypeError: sequence item 1: expected a bytes-like object, NoneType found

These are my codes until reaching the increase_task_progress function.

views.py

    from django.http                import HttpResponse
    from . import tasts

    def myView(request):
        report_id = request.POST.get('report_id', 0)
        result = tasks.generate_report.delay(report_id)
        response = HttpResponse(content=task.task_id) 
        return response    

tasks.py

from . import excel
from celery                     import shared_task

@shared_task(bind=True)
def generate_report(self, report_id):

    task = excel.generate_xls_report(
        report_id=report_id
        task_obj=self
    )

    return task

excel.py

from celery_progress.backend    import ProgressRecorder
from xlsxwriter.workbook        import Workbook
from .                          import database
import threading, io

workbook = None
progress_recorder = None
progress_recorder_current = 0
progress_recorder_total = 4

def generate_xls_report(report_id, task_obj):
    global progress_recorder, workbook
    progress_recorder = ProgressRecorder(task=task_obj)
    output = io.BytesIO()
    workbook = Workbook(output, {'constant_memory': True})

    db_conn = database.get_db_conn()
    thread_product = threading.Thread(target=generate_data_sheets, args=("product", db_conn))
    thread_product.start()
    thread_sales = threading.Thread(target=generate_data_sheets, args=("sales", db_conn))
    thread_sales.start()

    thread_product.join()
    thread_sales.join()

    db_conn.close()
    workbook.close()
    output.seek(0)

    print("Program finished!")

    return workbook

def generate_data_sheets(table_name, db_conn):
    #Generate the excel data sheet code HERE...

    increase_task_progress()
    print("Table {} done".format(table_name))

def increase_task_progress():
    global progress_recorder_current, progress_recorder_total, progress_recorder
    progress_recorder_current += 1
    progress_recorder.set_progress(progress_recorder_current, progress_recorder_total)

Please, what's wrong with my code?

Thank you


Solution

  • I managed to solve this issue by create a class called "ReportGenerator" and using threading.Lock() for python.

    See below the solved code:

    class ReportGenerator(object):
        def __init__(self, start_datetime, end_datetime, task_obj):
            self.lock = threading.Lock()    
            self.task_obj = task_obj
            self.task_id = self.task_obj.request.id
            self.progress_recorder_current = 0
            self.progress_recorder_total = 6
    
        def xlsx_generator(self):
            #Xlsx report first steps here
    
            #Threads starts and joins that calls "generate_data_sheet()" method
    
            return workbook
    
        def generate_data_sheet(self):
            #Put data in xlsx rows...
    
            self.increase_task_progress()
    
        def increase_task_progress(self):
            self.lock.acquire()
            self.progress_recorder_current += 1
    
            percent = (Decimal(self.progress_recorder_current) / Decimal(self.progress_recorder_total)) * Decimal(100)
            percent = float(round(percent, 2))
    
            try:
                self.task_obj.update_state(
                    task_id=self.task_id, # I had to pass the task_id to update_state
                    state="PROGRESS",
                    meta={
                        'current': self.progress_recorder_current,
                        'total': self.progress_recorder_total,
                        'percent': percent,
                    }
                )
            finally:
                self.lock.release()