I am scraping a large amount of data from a website and the problem is it is taking too much time by inserting one by one into the database I am looking for a smart way to bulk insert or make a batch insert to the database so it won't take like forever to push it to the database. I am using sqlalchemy1.4
orm and scrapy framework.
models:
from sqlalchemy import Column, Date, String, Integer, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from . import settings
engine = create_engine(settings.DATABSE_URL)
Session = sessionmaker(bind=engine)
session = Session()
DeclarativeBase = declarative_base()
class Olx_Eg(DeclarativeBase):
"""
Defines the property listing model
"""
__tablename__ = "olx_egypt"
_id = Column(Integer, primary_key=True)
URL = Column("URL", String)
Breadcrumb = Column("Breadcrumb", String)
Price = Column("Price", String)
Title = Column("Title", String)
Type = Column("Type", String)
Bedrooms = Column("Bedrooms", String)
Bathrooms = Column("Bathrooms", String)
Area = Column("Area", String)
Location = Column("Location", String)
Compound = Column("Compound", String)
seller = Column("seller", String)
Seller_member_since = Column("Seller_member_since", String)
Seller_phone_number = Column("Seller_phone_number", String)
Description = Column("Description", String)
Amenities = Column("Amenities", String)
Reference = Column("Reference", String)
Listed_date = Column("Listed_date", String)
Level = Column("Level", String)
Payment_option = Column("Payment_option", String)
Delivery_term = Column("Delivery_term", String)
Furnished = Column("Furnished", String)
Delivery_date = Column("Delivery_date", String)
Down_payment = Column("Down_payment", String)
Image_url = Column("Image_url", String)
Here is my scrapy pipeline right now:
from olx_egypt.models import Olx_Eg, session
class OlxEgPipeline:
def __init__(self):
"""
Initializes database connection and sessionmaker.
Creates items table.
"""
def process_item(self, item, spider):
"""
Process the item and store to database.
"""
# session = self.Session()
instance = session.query(Olx_Eg).filter_by(Reference=item["Reference"]).first()
if instance:
return instance
else:
olx_item = Olx_Eg(**item)
session.add(olx_item)
try:
session.commit()
except:
session.rollback()
raise
finally:
session.close()
return item
I tried creating a list and appending the items to it and then on closing the spider push it to db:
from olx_egypt.models import Olx_Eg, session
class ExampleScrapyPipeline:
def __init__(self):
self.items = []
def process_item(self, item, spider):
self.items.append(item)
return item
def close_spider(self, spider):
try:
session.bulk_insert_mappings(Olx_Eg, self.items)
session.commit()
except Exception as error:
session.rollback()
raise
finally:
session.close()
but it failed on session.bulk_insert_mappings(Olx_Eg, self.items)
this line. Can anyone tell me how can I make scrapy pipeline bulk or batch insert?
I was actually working on something very similar and have built a pipeline to inject the data with using pandas.to_sql
, there are less lines of code required and its pretty fast as I have activated method='multi'
, if you're uploading to mssql
then you can take advantage of fast_executemany=True
, as provided in this post: Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC.
I have tried to make it as general as possible for access to different drivernames.
Here's with an example:
scraper.py
import scrapy
from scrapy_exercises.items import ScrapyExercisesItem
from scrapy.crawler import CrawlerProcess
class SQLTest(scrapy.Spider):
name = 'SQL'
start_urls = [f'https://quotes.toscrape.com/page/{i}/' for i in range(1, 11)]
custom_settings = {
"FEED": {"test" : {"format": "csv"}}
}
def start_requests(self):
for url in self.start_urls:
yield scrapy.Request(
url=url,
callback = self.parse
)
def parse(self, response):
content = response.xpath("//div[@class='col-md-8']//div")
for items in content:
table = ScrapyExercisesItem()
#table._name= items.xpath(".//span//@href").get()
#table._keyword= items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
#yield table.returnTable()
table['name'] = items.xpath(".//span//@href").get()
table['keyword'] = items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
return table
items.py
import scrapy
class ScrapyExercisesItem(scrapy.Item):
name = scrapy.Field()
keyword = scrapy.Field()
pipelines.py
from sqlalchemy import create_engine, String
import pandas as pd
import pyodbc
import logging
from itemadapter import is_item
from itemadapter import ItemAdapter
logger = logging.getLogger(__name__)
class DataframeSQLPipelineInject:
def __init__(self, user, passw, host, port, database, table, if_exists, drivername):
self._user = user
self._passw = passw
self._host = host
self._port = port
self._database = database
self.table = table
self.if_exists = if_exists
self.drivername = drivername
@classmethod
def from_crawler(cls, crawler):
return cls(
user = crawler.settings.get('DATABASE')['user'],
passw = crawler.settings.get('DATABASE')['passw'],
host = crawler.settings.get('DATABASE')['host'],
port = crawler.settings.get('DATABASE')['port'],
database = crawler.settings.get('DATABASE')['database'],
table = crawler.settings.get('DATABASE')['table'],
if_exists = crawler.settings.get('DATABASE')['if_exists'],
drivername = crawler.settings.get('DATABASE')['drivername']
)
def open_spider(self, spider):
self.engine = create_engine(
f'{self.drivername}://' + #change this to your required server
self._user + ':' +
self._passw + '@' +
self._host + ':' +
str(self._port) + '/' +
self._database ,#+f'?driver=ODBC+Driver+18+for+SQL+Server' , #change this to your required driver
echo=False,
#connect_args={"timeout":30},
pool_pre_ping=True
#fast_executemany=True
#--- Add if using drivername mssql+pyodbc,
#then remove if_exists = self.if_exists from table_df
)
self.conn = self.engine.connect()
def close_spider(self, spider):
self.conn.close()
def process_item(self,item, spider):
if is_item(item):
table_df = pd.DataFrame([ItemAdapter(item).asdict()])
print(table_df.dtypes)
table_df.to_sql(self.table, con=self.engine,method='multi',dtype={'name':String(), 'keyword':String()}, chunksize=2000, index=False, if_exists = self.if_exists)
else:
logger.error(f'You need a dict for item, you have type: {type(item)}')
settings.py:
DATABASE = {
"user": "usr",
"passw": "",
"host": "localhost",
"port": '5432',
"database": "scraper",
'table':'some_table',
'if_exists':'append',
'drivername':'postgresql'
}
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
ITEM_PIPELINES = {
'scrapy_exercises.pipelines.sql_import.DataframeSQLPipelineInject':50
}
You'll need to use if_exists
and add append
even if you want to create a table. Because scrapy is single threaded it will create then append the values on after each reactor loop.
I hope this helps with your speed problem as I have not tested with large amounts of data.
It works on my end, check the image:
Update your items.py with this:
class ScrapyExercisesItem(scrapy.Item):
URL = scrapy.Field()
Breadcrumb = scrapy.Field()
Price = scrapy.Field()
Title = scrapy.Field()
Type = scrapy.Field()
Bedrooms = scrapy.Field()
Bathrooms = scrapy.Field()
Area = scrapy.Field()
Location = scrapy.Field()
keyword = scrapy.Field()
Compound = scrapy.Field()
seller = scrapy.Field()
Seller_member_since = scrapy.Field()
Seller_phone_number = scrapy.Field()
Description = scrapy.Field()
Amenities = scrapy.Field()
Reference = scrapy.Field()
Listed_date = scrapy.Field()
Level = scrapy.Field()
Payment_option = scrapy.Field()
Delivery_term = scrapy.Field()
Furnished = scrapy.Field()
Delivery_date = scrapy.Field()
Down_payment = scrapy.Field()
Image_url = scrapy.Field()
And remove the following in your scraper:
item = {}
replace it with:
from your_path.items import ScrapyExercisesItem
item = ScrapyExercisesItem()
Then do not yield
but return
instead. It is working for me so it should work for you.