Search code examples
pythondjangoredispython-rqdjango-rq

Redis still fills up when results_ttl=0, Why?


Question: Why is redis filling up if the results of jobs are discarded immediately?

I'm using redis as a queue to create PDFs asynchronously and then save the result to my database. Since its saved, I don't need to access the object a later date and so I don't need to keep store the result in Redis after its been processed.

To keep the result from staying in redis I've set the TTL to 0:

parameter_dict = {          
    "order": serializer.object,
    "photo": base64_image,
    "result_ttl": 0
}
django_rq.enqueue(procces_template, **parameter_dict)

The problem is although the redis worker says the job expires immediately:

15:33:35 Job OK, result = John Doe's nail order to 568 Broadway
15:33:35 Result discarded immediately.
15:33:35
15:33:35 *** Listening on high, default, low...

Redis still fills up and throws:

ResponseError: command not allowed when used memory > 'maxmemory'

Is there another parameter that I need to set in redis / django-rq to keep redis from filling up if the job result is already not stored?


Update:

Following this post I expect the memory might be filling up because of the failed jobs in redis.

Using this code snippet:

def print_redis_failed_queue():
    q = django_rq.get_failed_queue()
    while True:
        job = q.dequeue()
        if not job:
            break
        print job

here is a paste bin of a dump of the keys in redis:

http://pastebin.com/Bc4bRyRR

Its too long to be pragmatic to post here. Its size seems to support my theory. But using:

def delete_redis_failed_queue():
    q = django_rq.get_failed_queue()
    count = 0
    while True:
        job = q.dequeue()
        if not job:
            print "{} Jobs deleted.".format(count)
            break
        job.delete()
        count += 1  

Doest clear redis like i expect. How can I get a more accurate dump of the keys in redis? Am I clearing the jobs correctly?


Solution

  • It turns out Redis was filling up because of orphaned jobs, ie. jobs that were not assigned to a particular queue.

    Although the cause of the orphaned jobs is unknown, the problem is solved with this snippet:

    import redis
    from rq.queue import Queue, get_failed_queue
    from rq.job import Job
    redis = Redis()
    for i, key in enumerate(self.redis.keys('rq:job:*')):
        job_number = key.split("rq:job:")[1]
        job = Job.fetch(job_number, connection=self.redis)
        job.delete()
    

    In my particular situation, calling this snippet, (actually the delete_orphaned_jobs() method below ), after the competition of each job ensured that Redis would not fill up, and that orphaned jobs would be taken care of. For more details on the issue, here's a link to the conversation in the opened django-rq issue.

    In the process of diagnosing this issue, I also created a utility class for inspecting and deleting jobs / orphaned jobs with ease:

    class RedisTools:
        '''
        A set of utility tools for interacting with a redis cache
        '''
    
        def __init__(self):
            self._queues = ["default", "high", "low", "failed"]
            self.get_redis_connection()
    
        def get_redis_connection(self):
            redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
            self.redis = redis.from_url(redis_url)
    
        def get_queues(self):
            return self._queues
    
        def get_queue_count(self, queue):
            return Queue(name=queue, connection=self.redis).count
    
        def msg_print_log(self, msg):
            print msg
            logger.info(msg)
    
        def get_key_count(self):
            return len(self.redis.keys('rq:job:*'))
    
        def get_queue_job_counts(self):
            queues = self.get_queues()
            queue_counts = [self.get_queue_count(queue) for queue in queues]
            return zip(queues, queue_counts)
    
        def has_orphanes(self):
            job_count = sum([count[1] for count in self.get_queue_job_counts()])
            return job_count < self.get_key_count()
    
        def print_failed_jobs(self):
            q = django_rq.get_failed_queue()
            while True:
                job = q.dequeue()
                if not job:
                    break
                print job
    
        def print_job_counts(self):
            for queue in self.get_queue_job_counts():
                print "{:.<20}{}".format(queue[0], queue[1])
            print "{:.<20}{}".format('Redis Keys:', self.get_key_count())
    
        def delete_failed_jobs(self):
            q = django_rq.get_failed_queue()
            count = 0
            while True:
                job = q.dequeue()
                if not job:
                    self.msg_print_log("{} Jobs deleted.".format(count))
                    break
                job.delete()
                count += 1
    
        def delete_orphaned_jobs(self):
            if not self.has_orphanes():
                return self.msg_print_log("No orphan jobs to delete.")
    
            for i, key in enumerate(self.redis.keys('rq:job:*')):
                job_number = key.split("rq:job:")[1]
                job = Job.fetch(job_number, connection=self.redis)
                job.delete()
                self.msg_print_log("[{}] Deleted job {}.".format(i, job_number))