Search code examples
pythondjangocelerycelerybeatflower

How to inherit a django class method from views.py into tasks.py to schedule jobs with Celery?


I am trying in django with Celery to schedule jobs. The problem is that i cannot make my @shared_task function in tasks.py inherit the class method from views.py. I installed Celery together with Flower and Beat, and the configuration is working well when i write just a standalone function in tasks.py. Do you have any suggestions how to solve this inheritance challenge? Thanks for your help.

my views.py

from rest_framework import generics
from .serializers import TickerSerializer
from ticker.models import Ticker
import requests
import logging
from itertools import chain
from importlib import reload
import sys


logger = logging.getLogger(__name__)

class TickerList(generics.ListAPIView):
    serializer_class = TickerSerializer
    queryset = Ticker.objects.all()
    
class TickerRetrieve(generics.RetrieveAPIView):
    serializer_class = TickerSerializer
    queryset = Ticker.objects.all()
    lookup_field = 'id'   
    
    def get_object(request, **args):
        url = 'https://api.kraken.com/0/public/Ticker?pair=1INCHEUR,1INCHUSD'
        response = requests.get(url)
        data = response.json()
        Ticker.objects.all().delete()
        for key in data['result']:
            if isinstance(data['result'][key], int):
                continue
            crypto_data =data['result'][key]
            ticker_data = Ticker(crypto = (key),
                                a_0 = (crypto_data.get('a')[0]),
                                a_1 = (crypto_data.get('a')[1]),
                                )

            ticker_data.save()

my tasks.py

from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command

a = TickerRetrieve()
    
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task(ticker_data):
    return a.get_object(ticker_data)

The Flower and the local servers show the scheduled tasks working without bugs, however i cannot make tasks.py trigger the class method in views.py to run. I went through several examples here and read the documentation but none add a similar example.


Solution

  • The solution was just by returning the function name a.get_object() without any arguments in tasks.py The automated task is working like a clock now.

    tasks.py

    from __future__ import absolute_import, unicode_literals
    from celery.utils.log import get_task_logger
    import sys
    from celery import shared_task
    from ticker.views import TickerRetrieve
    from django.core.management import call_command
    
    
    a = TickerRetrieve()
        
    @shared_task(name="updatemy_ticker")
    def updatemy_ticker_task():
        return a.get_object()