Search code examples
pythonsqlalchemyscrapyscrapy-pipeline

Bulk insert scrapy pipeline using sqlalchemy


I am scraping a large amount of data from a website and the problem is it is taking too much time by inserting one by one into the database I am looking for a smart way to bulk insert or make a batch insert to the database so it won't take like forever to push it to the database. I am using sqlalchemy1.4 orm and scrapy framework.

models:

from sqlalchemy import Column, Date, String, Integer, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from . import settings

engine = create_engine(settings.DATABSE_URL)
Session = sessionmaker(bind=engine)
session = Session()
DeclarativeBase = declarative_base()


class Olx_Eg(DeclarativeBase):
    """
    Defines the property listing model
    """

    __tablename__ = "olx_egypt"
    _id = Column(Integer, primary_key=True)
    URL = Column("URL", String)
    Breadcrumb = Column("Breadcrumb", String)
    Price = Column("Price", String)
    Title = Column("Title", String)
    Type = Column("Type", String)
    Bedrooms = Column("Bedrooms", String)
    Bathrooms = Column("Bathrooms", String)
    Area = Column("Area", String)
    Location = Column("Location", String)
    Compound = Column("Compound", String)
    seller = Column("seller", String)
    Seller_member_since = Column("Seller_member_since", String)
    Seller_phone_number = Column("Seller_phone_number", String)
    Description = Column("Description", String)
    Amenities = Column("Amenities", String)
    Reference = Column("Reference", String)
    Listed_date = Column("Listed_date", String)
    Level = Column("Level", String)
    Payment_option = Column("Payment_option", String)
    Delivery_term = Column("Delivery_term", String)
    Furnished = Column("Furnished", String)
    Delivery_date = Column("Delivery_date", String)
    Down_payment = Column("Down_payment", String)
    Image_url = Column("Image_url", String)

Here is my scrapy pipeline right now:

from olx_egypt.models import Olx_Eg, session


class OlxEgPipeline:
    def __init__(self):
        """
        Initializes database connection and sessionmaker.
        Creates items table.
        """

    def process_item(self, item, spider):
        """
        Process the item and store to database.
        """
        # session = self.Session()
        instance = session.query(Olx_Eg).filter_by(Reference=item["Reference"]).first()
        if instance:
            return instance
        else:
            olx_item = Olx_Eg(**item)
            session.add(olx_item)

        try:
            session.commit()
        except:
            session.rollback()
            raise
        finally:
            session.close()

        return item

I tried creating a list and appending the items to it and then on closing the spider push it to db:

from olx_egypt.models import Olx_Eg, session

class ExampleScrapyPipeline:

    def __init__(self):

        self.items = []

    def process_item(self, item, spider):
        
        self.items.append(item)

        return item

    def close_spider(self, spider):
       

        try:
            session.bulk_insert_mappings(Olx_Eg, self.items)
            session.commit()

        except Exception as error:
            session.rollback()
            raise

        finally:
            session.close()

but it failed on session.bulk_insert_mappings(Olx_Eg, self.items) this line. Can anyone tell me how can I make scrapy pipeline bulk or batch insert?


Solution

  • I was actually working on something very similar and have built a pipeline to inject the data with using pandas.to_sql, there are less lines of code required and its pretty fast as I have activated method='multi', if you're uploading to mssql then you can take advantage of fast_executemany=True, as provided in this post: Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC.

    I have tried to make it as general as possible for access to different drivernames.

    Here's with an example:

    scraper.py

    import scrapy
    from scrapy_exercises.items import ScrapyExercisesItem
    from scrapy.crawler import CrawlerProcess
    
    class SQLTest(scrapy.Spider):
        name = 'SQL'
        start_urls = [f'https://quotes.toscrape.com/page/{i}/' for i in range(1, 11)]
    
        custom_settings = {
            "FEED": {"test" : {"format": "csv"}}
        }
    
        def start_requests(self):
            for url in self.start_urls:
                yield scrapy.Request(
                    url=url,
                    callback = self.parse
                )
    
        def parse(self, response):
            content = response.xpath("//div[@class='col-md-8']//div")
            for items in content:
                table = ScrapyExercisesItem()
                #table._name= items.xpath(".//span//@href").get()
                #table._keyword= items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
                #yield table.returnTable()
                table['name'] = items.xpath(".//span//@href").get()
                table['keyword'] = items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
                return table
    

    items.py

    import scrapy
    
    class ScrapyExercisesItem(scrapy.Item):
        name = scrapy.Field()
        keyword = scrapy.Field()
    

    pipelines.py

    from sqlalchemy import create_engine, String
    import pandas as pd
    import pyodbc
    import logging
    from itemadapter import is_item
    from itemadapter import ItemAdapter
    
    logger = logging.getLogger(__name__)
    
    class DataframeSQLPipelineInject:
    
        def __init__(self, user, passw, host, port, database, table, if_exists, drivername):
            self._user = user
            self._passw = passw
            self._host = host
            self._port = port
            self._database = database
            self.table = table
            self.if_exists = if_exists
            self.drivername = drivername
    
        
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                user = crawler.settings.get('DATABASE')['user'],
                passw = crawler.settings.get('DATABASE')['passw'],
                host = crawler.settings.get('DATABASE')['host'],
                port = crawler.settings.get('DATABASE')['port'],
                database = crawler.settings.get('DATABASE')['database'],
                table = crawler.settings.get('DATABASE')['table'],
                if_exists = crawler.settings.get('DATABASE')['if_exists'],
                drivername = crawler.settings.get('DATABASE')['drivername']
            )
    
        def open_spider(self, spider): 
            self.engine = create_engine(
                f'{self.drivername}://' + #change this to your required server
                self._user + ':' + 
                self._passw + '@' + 
                self._host + ':' + 
                str(self._port) + '/' + 
                self._database  ,#+f'?driver=ODBC+Driver+18+for+SQL+Server' , #change this to your required driver
                echo=False,
                #connect_args={"timeout":30},
                                pool_pre_ping=True
    #fast_executemany=True 
    #--- Add if using drivername mssql+pyodbc, 
    #then remove if_exists = self.if_exists from table_df
                                                    )
    
            self.conn = self.engine.connect()
    
        def close_spider(self, spider):
            self.conn.close()
    
        def process_item(self,item, spider):
            if is_item(item):
                table_df = pd.DataFrame([ItemAdapter(item).asdict()])
                print(table_df.dtypes)
                table_df.to_sql(self.table, con=self.engine,method='multi',dtype={'name':String(), 'keyword':String()}, chunksize=2000, index=False, if_exists = self.if_exists)
            else:
                logger.error(f'You need a dict for item, you have type: {type(item)}')
    

    settings.py:

    DATABASE = {
        "user": "usr",
        "passw": "",
        "host": "localhost",
        "port": '5432',
        "database": "scraper",
        'table':'some_table',
        'if_exists':'append',
        'drivername':'postgresql'
    }
    
    # Obey robots.txt rules
    ROBOTSTXT_OBEY = False
    
    ITEM_PIPELINES = {
        'scrapy_exercises.pipelines.sql_import.DataframeSQLPipelineInject':50
        }
    

    You'll need to use if_exists and add append even if you want to create a table. Because scrapy is single threaded it will create then append the values on after each reactor loop.

    I hope this helps with your speed problem as I have not tested with large amounts of data.

    It works on my end, check the image:

    enter image description here

    Update your items.py with this:

    class ScrapyExercisesItem(scrapy.Item):
        URL = scrapy.Field()
        Breadcrumb = scrapy.Field()
        Price = scrapy.Field()
        Title = scrapy.Field()
        Type = scrapy.Field()
        Bedrooms = scrapy.Field()
        Bathrooms = scrapy.Field()
        Area = scrapy.Field()
        Location = scrapy.Field()
        keyword = scrapy.Field()
        Compound = scrapy.Field()
        seller = scrapy.Field()
        Seller_member_since = scrapy.Field()
        Seller_phone_number = scrapy.Field()
        Description = scrapy.Field()
        Amenities = scrapy.Field()
        Reference = scrapy.Field()
        Listed_date = scrapy.Field()
        Level = scrapy.Field()
        Payment_option = scrapy.Field()
        Delivery_term = scrapy.Field()
        Furnished = scrapy.Field()
        Delivery_date = scrapy.Field()
        Down_payment = scrapy.Field()
        Image_url = scrapy.Field()
    

    And remove the following in your scraper:

    item = {}
    

    replace it with:

    from your_path.items import ScrapyExercisesItem
    item = ScrapyExercisesItem()
    

    Then do not yield but return instead. It is working for me so it should work for you.