I have written a program to crawl a single website and scrape certain data. I would like to speed up its execution by using ProcessingPoolExecutor
. However, I am having trouble understanding how I can convert from single threaded to concurrent.
Specifically, when creating a job (via ProcessPoolExecutor.submit()
), can I pass a class/object and args instead of a function and args?
And, if so, how do return data from those jobs to the queue for tracking visited pages AND a structure for holding scraped content?
I have been using this as a jumping off point, as well as reviewing the Queue and concurrent.futures docs (with, frankly, the latter going a bit over my head). I've also Googled/Youtubed/SO'ed around quite a bit to no avail.
from queue import Queue, Empty
from concurrent.futures import ProcessPoolExecutor
class Scraper:
"""
Scrapes a single url
"""
def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()
def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)
# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}
class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single site
"""
def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ProcessPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}
def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)
# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')
# passing an object to the
# ProcessPoolExecutor... can this be done?
job = self.pool.submit(Scraper, target_url)
"""
How do I 1) return the data from each
Scraper instance into self.data?
and 2) put scraped links to self.queued_urls?
"""
except Empty:
print("All done.")
except Exception as e:
print(e)
if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()
For anyone who comes across this page, I was able figure this out for myself.
Per @brad-solomon's advice, I switched from ProcessPoolExecutor
to ThreadPoolExecutor
to manage the concurrent aspects of this script (see his comment for further details).
W.r.t. the original question, the key was to utilize the add_done_callback
method of the ThreadPoolExecutor
in conjunction with a modification to Scraper.scrape
and a new method CrawlManager.proc_scraper_results
as in the following:
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
class Scraper:
"""
Scrapes a single url
"""
def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()
def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)
# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}
# these three items will be passed to the callback
# function with in a future object
return self.internal_urls, self.url, self.content
class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single website
"""
def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ThreadPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}
def proc_scraper_results(self, future):
# get the items of interest from the future object
internal_urls, url, content = future._result[0], future._result[1], future._result[2]
# assign scraped data/content
self.data[url] = content
# also add scraped links to queue if they
# aren't already queued or already processed
for link_url in internal_urls:
if link_url not in self.to_crawl.queue and link_url not in self.processed_urls:
self.to_crawl.put(link_url)
def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)
# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')
# add a job to the ThreadPoolExecutor (note, unlike original question, we pass a method, not an object)
job = self.pool.submit(Scraper(target_url).scrape)
# to add_done_callback we send another function, this one from CrawlManager
# when this function is itself called, it will be pass a `future` object
job.add_done_callback(self.proc_scraper_results)
except Empty:
print("All done.")
except Exception as e:
print(e)
if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()
The result of this is a very significant reduction in duration of this program.