Search code examples
pythonmongodbpython-2.7scrapypipeline

How to save data to MongoDB by pipeline when using multi spiders in Scrapy?


There are 2 spiders that I use to get data from webpage, and I use Crawler Process() to run them at the same time. The spiders' code:

class GDSpider(Spider):
name = "GenDis"
allowed_domains = ["gold.jgi.doe.gov"]
base_url ="https://gold.jgi.doe.gov/projects"
stmp = []
term = "man"
for i in range(1, 1000):
    url = "https://gold.jgi.doe.gov/projects?page="+ str(i) +"&Project.Project+Name="+ term+ "&count=25"
    stmp.append(url)

start_urls = stmp

def parse(self, response):
    sel = Selector(response)
    sites = sel.xpath('//tr[@class="odd"]|//tr[@class="even"]')

    for site in sites:
        item = GenDis()
        item['Id'] = site.xpath('td/a/text()').extract()
        item['Link'] = site.xpath('td/a/@href').extract()
        item['Name'] = map(unicode.strip, site.xpath('td[2]/text()').extract())
        item['Status'] = map(unicode.strip, site.xpath('td[3]/text()').extract())
        item['Add_Date'] = map(unicode.strip, site.xpath('td[4]/text()').extract())
        yield item



class EPGD_spider(Spider):
    name = "EPGD"
    allowed_domains = ["epgd.biosino.org"]
    term = "man"
    start_urls = ["http://epgd.biosino.org/EPGD/search/textsearch.jsp?textquery="+term+"&submit=Feeling+Lucky"]
    MONGODB_DB = name + "_" + term
    MONGODB_COLLECTION = name + "_" + term

def parse(self, response):
    sel = Selector(response)
    sites = sel.xpath('//tr[@class="odd"]|//tr[@class="even"]')
    url_list = []
    base_url = "http://epgd.biosino.org/EPGD"

    for site in sites:
        item = EPGD()
        item['genID'] = map(unicode.strip, site.xpath('td[1]/a/text()').extract())
        item['genID_url'] = base_url+map(unicode.strip, site.xpath('td[1]/a/@href').extract())[0][2:]
        item['taxID'] = map(unicode.strip, site.xpath('td[2]/a/text()').extract())
        item['taxID_url'] = map(unicode.strip, site.xpath('td[2]/a/@href').extract())
        item['familyID'] = map(unicode.strip, site.xpath('td[3]/a/text()').extract())
        item['familyID_url'] = base_url+map(unicode.strip, site.xpath('td[3]/a/@href').extract())[0][2:]
        item['chromosome'] = map(unicode.strip, site.xpath('td[4]/text()').extract())
        item['symbol'] = map(unicode.strip, site.xpath('td[5]/text()').extract())
        item['description'] = map(unicode.strip, site.xpath('td[6]/text()').extract())
        yield item

    sel_tmp = Selector(response)
    link = sel_tmp.xpath('//span[@id="quickPage"]')

    for site in link:
        url_list.append(site.xpath('a/@href').extract())

    for i in range(len(url_list[0])):
        if cmp(url_list[0][i], "#") == 0:
            if i+1 < len(url_list[0]):
                print url_list[0][i+1]
                actual_url = "http://epgd.biosino.org/EPGD/search/"+ url_list[0][i+1]
                yield Request(actual_url, callback=self.parse)
                break
            else:
                print "The index is out of range!"

process = CrawlerProcess()
process.crawl(EPGD_spider)
process.crawl(GDSpider)
process.start() # the script will block here until all crawling jobs are finished

I want to save the data to MongoDB database. Here is my pipeline code:

class EPGD_pipeline(object):
    def __init__(self):
        connection = pymongo.MongoClient(
            settings['MONGODB_SERVER'],
            settings['MONGODB_PORT']
        )
        db = connection[settings['MONGODB_DB']]
        self.collection = db[settings['MONGODB_COLLECTION']]

    def process_item(self, item, spider):
        valid = True
        for data in item:
            if not data:
                valid = False
                raise DropItem("Missing {0}!".format(data))
        if valid:
            self.collection.insert(dict(item))
            log.msg("Item wrote to MongoDB database {}, collection {}, at host {}, port {}".format(
            settings['MONGODB_DB'],
            settings['MONGODB_COLLECTION'],
            settings['MONGODB_SERVER'],
            settings['MONGODB_PORT']))
        return item

It works correctly when I use one spider at one time. But when I run them at the same time, it seems that the pipeline doesn't work any more. Neither database nor collections have been set up. I've seen the CrawlerProcess() part of Scrapy document many times, but it doesn't mention about the pipeline things. So can anybody tell me what's wrong with my code?


Solution

  • This should fix the problem:

    from scrapy.utils.project import get_project_settings
    process = CrawlerProcess(get_project_settings())
    process.crawl(EPGD_spider)
    process.crawl(GDSpider)
    process.start()
    

    You will also likely need to refactor your spider code to open a connection for each spider (this example is using "Bonus Tip 2" below):

    # In your pipeline
    
    class EPGD_pipeline(object):
        def __init__(self):
            self.collections = {
                spider_name: self.setup_db_connection(dj_mongo_database_url.parse(url))
                for spider_name, url in settings['MONGODB_PIPELINE_SETTINGS'].iterItems()
            )
        }
    
        def process_item(self, item, spider):
            collection = self.collections[spider.name]
            ...
    
    
    # In settings.py
    
    MONGODB_PIPELINE_SETTINGS = {
        "GenDis": "mongodb://myhost:29297/test_db/collection",
        "EPGD": "mongodb://myhost:29297/test_db/collection2",
    }
    

    Bonus Tip 1: Use txmongo instead of pymongo, otherwise you'll be getting potentially very bad performance (see also here).

    Bonus Tip 2: All those settings get difficult to manage. Consider using something like django-mongo-database-url to "pack" them all in a single URL and keep them more manage-able (would be more clean if collection was also in the URL).

    Bonus Tip 3: You likely do way too many writes/transactions. If the use-case allows, save results to .jl file(s) and use mongoimport to bulk-import on crawl finish. Here's how to do it in more detail.

    Assuming a project named tutorial and a spider named example that creates 100 items, you create an extension in tutorial/extensions.py:

    import logging
    import subprocess
    
    from scrapy import signals
    from scrapy.exceptions import NotConfigured
    
    logger = logging.getLogger(__name__)
    
    
    class MyBulkExtension(object):
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(crawler)
    
        def __init__(self, crawler):
            settings = crawler.settings
    
            self._feed_uri = settings.get('FEED_URI', None)
            if self._feed_uri is None:
                raise NotConfigured('Missing FEED_URI')
            self._db = settings.get('BULK_MONGO_DB', None)
            if self._db is None:
                raise NotConfigured('Missing BULK_MONGO_DB')
            self._collection = settings.get('BULK_MONGO_COLLECTION', None)
            if self._collection is None:
                raise NotConfigured('Missing BULK_MONGO_COLLECTION')
    
            crawler.signals.connect(self._closed, signal=signals.spider_closed)
    
        def _closed(self, spider, reason, signal, sender):
            logger.info("writting file %s to db %s, colleciton %s" %
                        (self._feed_uri, self._db, self._collection))
            command = ("mongoimport --db %s --collection %s --drop --file %s" %
                       (self._db, self._collection, self._feed_uri))
    
            p = subprocess.Popen(command.split())
            p.communicate()
    
            logger.info('Import done')
    

    On your tutorial/settings.py, you activate the extension and set the two settings:

    EXTENSIONS = {
        'tutorial.extensions.MyBulkExtension': 500
    }
    
    BULK_MONGO_DB = "test"
    BULK_MONGO_COLLECTION = "foobar"
    

    You can then run your crawl like this:

    $ scrapy crawl -L INFO example -o foobar.jl
    ...
    [tutorial.extensions] INFO: writting file foobar.jl to db test, colleciton foobar
    connected to: 127.0.0.1
    dropping: test.foobar
    check 9 100
    imported 100 objects
    [tutorial.extensions] INFO: Import done
    ...