I wrote javascript code for a web crawler that scraps data from a list of websites (in csv file) in a single browser instance (code below). Now I want to modify the code for the scenario in which every single website in the list runs parallel at the same time in two browser instances. For example, a website www.a.com in the list should run in parallel at the same time on two browser instances and the same goes for the rest of the websites. If anyone can help me, please. I would be very thankful.
(async () => {
require("dotenv").config();
if (!process.env.PROXY_SPKI_FINGERPRINT) {
throw new Error("PROXY_SPKI_FINGERPRINT is not defined in environment.");
}
const fs = require("fs");
const fsPromises = fs.promises;
const pptr = require("puppeteer");
const browser = await pptr.launch({
args: [
"--proxy-server=https://127.0.0.1:8000",
"--ignore-certificate-errors-spki-list=" + process.env.PROXY_SPKI_FINGERPRINT,
"--disable-web-security",
],
// headless: false,
});
const sites = (await fsPromises.readFile(process.argv[2])) // sites list in csv file
.toString()
.split("\n")
.map(line => line.split(",")[1])
.filter(s => s);
for (let i in sites) {
const site = sites[i];
console.log(`[${i}] ${site}`);
try {
await fsPromises.appendFile("data.txt", JSON.stringify(await crawl(browser, site)) + "\n");
} catch (e) {
console.error(e);
}
}
await browser.close();
async function crawl(browser, site) {
const page = await browser.newPage();
try {
const grepResult = [];
page.on("request", async request => {
request.continue();
})
page.on("response", async response => {
try {
if (response.request().resourceType() === "script" &&
response.headers()["content-type"] &&
response.headers()["content-type"].includes("javascript")) {
const js = await response.text();
const grepPartResult = grepMagicWords(js);
grepResult.push([response.request().url(), grepPartResult]);
}
} catch (e) {}
});
await page.setRequestInterception(true);
try {
await page.goto("http://" + site, {waitUntil: "load", timeout: 60000});
await new Promise(resolve => { setTimeout(resolve, 10000); });
} catch (e) { console.error(e); }
const [flows, url] = await Promise.race([
page.evaluate(() => [J$.FLOWS, document.URL]),
new Promise((_, reject) => { setTimeout(() => { reject(); }, 5000); })
]);
return {url: url, grepResult: grepResult, flows: flows};
} finally {
await page.close();
}
function grepMagicWords(js) {
var re = /(?:\'|\")(?:g|s)etItem(?:\'|\")/g, match, result = [];
while (match = re.exec(js)) {
result.push(js.substring(match.index - 100, match.index + 100));
}
return result;
}
}
})();
You can launch multiple browsers and run them in parallel. You would have to restructure your app slighltly for that. Create a wrapper for crawl
which launches it with a new browser instance. I created crawlNewInstance
which does that for you. You would also need to run crawlNewInstance()
in parallel.
Checkout this code:
const sites = (await fsPromises.readFile(process.argv[2])) // sites list in csv file
.toString()
.split("\n")
.map(line => line.split(",")[1])
.filter(s => s);
const crawlerProms = sites.map(async (site, index) => {
try {
console.log(`[${index}] ${site}`);
await fsPromises.appendFile("data.txt", JSON.stringify(await crawlNewInstance(site)) + "\n");
} catch (e) {
console.error(e);
}
}
// await all the crawlers!.
await Promise.all(crawlerProms)
async function crawlNewInstance(site) {
const browser = await pptr.launch({
args: [
"--proxy-server=https://127.0.0.1:8000",
"--ignore-certificate-errors-spki-list=" + process.env.PROXY_SPKI_FINGERPRINT,
"--disable-web-security",
],
// headless: false,
});
const result = await crawl(browser, site)
await browser.close()
return result
}
The above answers basically the question. But If you want to go further I was in a run and had nothing todo :)
If you have plenty of pages, which you wanted to crawl in parallel and for example limit the amount of parallel requests you could use a Queue
:
var { EventEmitter} = require('events')
class AsyncQueue extends EventEmitter {
limit = 2
enqueued = []
running = 0
constructor(limit) {
super()
this.limit = limit
}
isEmpty() {
return this.enqueued.length === 0
}
// make sure to only pass `async` function to this queue!
enqueue(fn) {
// add to queue
this.enqueued.push(fn)
// start a job. If max instances are already running it does nothing.
// otherwise it runs a new job!.
this.next()
}
// if a job is done try starting a new one!.
done() {
this.running--
console.log('job done! remaining:', this.limit - this.running)
this.next()
}
async next() {
// emit if queue is empty once.
if(this.isEmpty()) {
this.emit('empty')
return
}
// if no jobs are available OR limit is reached do nothing
if(this.running >= this.limit) {
console.log('queueu full.. waiting!')
return
}
this.running++
console.log('running job! remaining slots:', this.limit - this.running)
// first in, first out! so take first element in array.
const job = this.enqueued.shift()
try {
await job()
} catch(err) {
console.log('Job failed!. ', err)
this.emit('error', err)
}
// job is done!
// Done() will call the next job if there are any available!.
this.done()
}
}
The queue could be utilised with this code:
// create queue
const limit = 3
const queue = new AsyncQueue(limit)
// listen for any errors..
queue.on('error', err => {
console.error('error occured in queue.', err)
})
for(let site of sites) {
// enqueue all crawler jobs.
// pass an async function which does whatever you want. In this case it crawls
// a web page!.
queue.enqueue(async() => {
await fsPromises.appendFile("data.txt", JSON.stringify(await crawlNewInstance(site)) + "\n");
})
}
// helper for watiing for the queue!
const waitForQueue = async () => {
if(queue.isEmpty) return Promise.resolve()
return new Promise((res, rej) => {
queue.once('empty', res)
})
}
await waitForQueue()
console.log('crawlers done!.')
It would also be possible to reuse your browser instances, so it would not be necessary to start a new browser instance for every crawling process. This can be done using this Browserpool
helper class
var pptr = require('puppeteer')
async function launchPuppeteer() {
return await pptr.launch({
args: [
"--proxy-server=https://127.0.0.1:8000",
"--ignore-certificate-errors-spki-list=" + process.env.PROXY_SPKI_FINGERPRINT,
"--disable-web-security",
],
// headless: false,
});
}
// manages browser connections.
// creates a pool on startup and allows getting references to
// the browsers! .
class BrowserPool {
browsers = []
async get() {
// return browser if there is one!
if(this.browsers.length > 0) {
return this.browsers.splice(0, 1)[0]
}
// no browser available anymore..
// launch a new one!
return await launchPuppeteer()
}
// used for putting a browser back in pool!.
handback(browser) {
this.browsers.push(browser)
}
// shuts down all browsers!.
async shutDown() {
for(let browser of this.browsers) {
await browser.close()
}
}
}
You can then remove crawlNewInstance()
and adjust the code to look like this finally:
const sites = (await fsPromises.readFile(process.argv[2])) // sites list in csv file
.toString()
.split("\n")
.map(line => line.split(",")[1])
.filter(s => s);
// create browserpool
const pool = new BrowserPool()
// create queue
const limit = 3
const queue = new AsyncQueue(3)
// listen to errors:
queue.on('error', err => {
console.error('error in the queue detected!', err)
})
// enqueue your jobs
for(let site of sites) {
// enqueue an async function which takes a browser from pool
queue.enqueue(async () => {
try {
// get the browser and crawl a page!.
const browser = await pool.get()
const result = await crawl(browser, site)
await fsPromises.appendFile("data.txt", JSON.stringify(result) + "\n");
// return the browser back to pool so other crawlers can use it! .
pool.handback(browser)
} catch(err) {
console.error(err)
}
})
}
// helper for watiing for the queue!
const waitForQueue = async () => {
// maybe jobs fail in a few milliseconds so check first if its already empty..
if(queue.isEmpty) return Promise.resolve()
return new Promise((res, rej) => {
queue.once('empty', res)
})
}
// wait for the queue to finish :)
await waitForQueue()
// in the very end, shut down all browser:
await pool.shutDown()
console.log('done!.')
Have fun and feel free to leave a comment.
AyncQueue
and waitForQueue()
Fixes:
queue.isEmpty()
was used wrongly. I forgot to put the parenthesis.All fixes are marked with @fix
in code.
var { EventEmitter} = require('events')
class AsyncQueue extends EventEmitter {
limit = 2
queue = []
running = 0
constructor(limit) {
super()
this.limit = limit
}
//@fix
// Also check if there are still running jobs.
isEmpty() {
return this.queue.length === 0 && this.running === 0
}
// make sure to only pass `async` function to this queue!
enqueue(fn) {
// add to queue
this.queue.push(fn)
// start a job. If max instances are already running it does nothing.
// otherwise it runs a new job!.
this.next()
}
// if a job is done try starting a new one!.
onDone() {
this.running--
//@fix
// Remaining jobs calculation is now correct
console.log('job done! remaining:', this.running + this.queue.length)
this.next()
}
async next() {
//@fix
// Emit if it is empty anyways.
if(this.isEmpty()) {
this.emit('empty')
}
//@fix
// Stop going when no jobs left.
if(this.queue.length === 0) {
return
}
// if no jobs are available OR limit is reached do nothing
if(this.running >= this.limit) {
console.log('queueu full.. waiting!')
return
}
this.running++
console.log('running job! remaining slots:', this.limit - this.running)
// first in, first out! so take first element in array.
const job = this.queue.shift()
try {
await job()
} catch(err) {
console.log('Job failed!. ', err)
this.emit('error', err)
}
// job is done!
// Done() will call the next job if there are any available!.
this.onDone()
}
}
waitForQueue()
fixedconst waitForQueue = async () => {
//@fix
// call isEmpty, it was used as property before
if(queue.isEmpty()) return Promise.resolve()
return new Promise((res, rej) => {
queue.once('empty', res)
})
}
function randomInt(min, max) {
return Math.floor(Math.random() * (max - min + 1)) + min
}
const queue = new AsyncQueue(5)
for(let i = 0; i < 10; i++) {
const x = i
const timeout = randomInt(1, 6) * 1000
const fn = (res) => {
console.log('item', x, 'done')
res()
}
queue.enqueue(() => new Promise(res => setTimeout(() => fn(res), timeout)))
}
waitForQueue().then(() => console.log('we are done.'))
Output:
running job! remaining slots: 4
running job! remaining slots: 3
running job! remaining slots: 2
running job! remaining slots: 1
running job! remaining slots: 0
queueu full.. waiting!
queueu full.. waiting!
queueu full.. waiting!
queueu full.. waiting!
queueu full.. waiting!
item 3 done
job done! remaining: 9
running job! remaining slots: 0
item 1 done
job done! remaining: 8
running job! remaining slots: 0
item 4 done
job done! remaining: 7
running job! remaining slots: 0
item 0 done
job done! remaining: 6
running job! remaining slots: 0
item 2 done
job done! remaining: 5
running job! remaining slots: 0
item 5 done
job done! remaining: 4
item 6 done
job done! remaining: 3
item 8 done
job done! remaining: 2
item 7 done
job done! remaining: 1
item 9 done
job done! remaining: 0
we are done.