Search code examples
pythonscrapyrabbitmqpika

How to requeue the messages when the spider has error with scrapy and Rabbitmq (pika)


I'm trying to use pika and scrapy to run MQ, and let consumer call the spider. I have a consumer.py and a scrapy spider spider.py.

The spider is running in the consumer with the argument sent from producer. I use used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag) to delete the message.

I expected the message would be deleted when the spider finishes the job, and if there's an error the message should be requeued. When the spider runs properly, everything looks fine; the message is deleted and the job is done. However, if there's an error happening when running the spider, the message is still got deleted and the job is not done but the messages missed.

I watched the Rabbitmq management UI, and I found the message became 0 when the spider was still running (the console hadn't showed the job was done yet).

I wonder is it because scrapy is asynchronous? So when this line run_spider(message=decodebody) is still running the next line used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag) won't wait until the spider is done.

How can I fix this? I would like to delete the message after the spider is done the job properly.

from scrapy.utils.project import get_project_settings

setup() # for CrawlerRunner
settings = get_project_settings()

def get_message(used_channel, basic_deliver, properties, body):
    decodebody = bytes.decode(body)

    try:
        run_spider(message=decodebody)
        used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)

    except: 
        channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)


def run_spider(message):
    crawler = CrawlerRunner(settings)
    crawler.crawl(MySpider, message=message)


while(True):
    try: 
        # blocking connection
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
        channel = connection.channel()
        # declare exchange, the setting must be same as producer
        channel.exchange_declare(
            exchange=rabbit_exchange,
            exchange_type='direct',  
            durable=True,            
            auto_delete=False        
        )
        # declare queue, the setting must be same as producer
        channel.queue_declare(
            queue=rabbit_queue, 
            durable=True, 
            exclusive=False,
            auto_delete=False
        )
        # bind the setting
        channel.queue_bind(
            exchange=rabbit_exchange,
            queue=rabbit_queue,
            routing_key=routing_key
        )

        channel.basic_qos(prefetch_count=1) 
        channel.basic_consume(
            queue=rabbit_queue,
            on_message_callback=get_message,
            auto_ack=False
        )

        logger.info(' [*] Waiting for messages. To exit press CTRL+C')
        # start crawler
        channel.start_consuming()
    
    except pika.exceptions.ConnectionClosed as err:
        print('ConnectionClosed error:', err)
        continue
    # Do not recover on channel errors
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError as err:    
        print("Connection was closed, retrying...", err)
        continue




Solution

  • I found someone handle multithread with MQ for pika library. He uses .is_alive to check if the thread is done. So, I follow the idea. Scrapy is multithread, I add the return crawler, and check crawler._active before deleting the message.

    Source code for scrapy.crawler

    def run_spider(news_info):
        # run spider with CrawlerRunner
        crawler = CrawlerRunner(settings)
        # run the spider script
        crawler.crawl(UrlSpider, news_info=news_info)
    
        return crawler
    
    crawler = run_spider(news_info=decodebody)
            
    # wait until the crawler is done
    while (len(crawler._active) > 0):
        time.sleep(1)
    
    used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)