I'm trying to use pika and scrapy to run MQ, and let consumer call the spider. I have a consumer.py
and a scrapy spider spider.py
.
The spider is running in the consumer with the argument sent from producer. I use
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
to delete the message.
I expected the message would be deleted when the spider finishes the job, and if there's an error the message should be requeued. When the spider runs properly, everything looks fine; the message is deleted and the job is done. However, if there's an error happening when running the spider, the message is still got deleted and the job is not done but the messages missed.
I watched the Rabbitmq management UI, and I found the message became 0 when the spider was still running (the console hadn't showed the job was done yet).
I wonder is it because scrapy is asynchronous? So when this line run_spider(message=decodebody)
is still running the next line used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
won't wait until the spider is done.
How can I fix this? I would like to delete the message after the spider is done the job properly.
from scrapy.utils.project import get_project_settings
setup() # for CrawlerRunner
settings = get_project_settings()
def get_message(used_channel, basic_deliver, properties, body):
decodebody = bytes.decode(body)
try:
run_spider(message=decodebody)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
except:
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)
def run_spider(message):
crawler = CrawlerRunner(settings)
crawler.crawl(MySpider, message=message)
while(True):
try:
# blocking connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
channel = connection.channel()
# declare exchange, the setting must be same as producer
channel.exchange_declare(
exchange=rabbit_exchange,
exchange_type='direct',
durable=True,
auto_delete=False
)
# declare queue, the setting must be same as producer
channel.queue_declare(
queue=rabbit_queue,
durable=True,
exclusive=False,
auto_delete=False
)
# bind the setting
channel.queue_bind(
exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=routing_key
)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=rabbit_queue,
on_message_callback=get_message,
auto_ack=False
)
logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# start crawler
channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
print('ConnectionClosed error:', err)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError as err:
print("Connection was closed, retrying...", err)
continue
I found someone handle multithread with MQ for pika library. He uses .is_alive
to check if the thread is done. So, I follow the idea. Scrapy is multithread, I add the return crawler
, and check crawler._active
before deleting the message.
Source code for scrapy.crawler
def run_spider(news_info):
# run spider with CrawlerRunner
crawler = CrawlerRunner(settings)
# run the spider script
crawler.crawl(UrlSpider, news_info=news_info)
return crawler
crawler = run_spider(news_info=decodebody)
# wait until the crawler is done
while (len(crawler._active) > 0):
time.sleep(1)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)