I was working on a web scraping project, but it was taking a lot of time processing the data, I came up with an alternate route to scrape the source code of products being scraped and then process data separately.
What I did is, stored the source code of each product enclosed separately within a tuple in an array and saved that array data in a text file for further processing at a later stage. I save data as chunks of 10,000 products. Each text file is about 10GB.
When I started to process data using multiprocessing I kept coming across the BrokenPipeError: [Error 32], Initially I was processing data on a windows machine, I explored a bit found that Linux is better at managing memory and this error is because of complete memory utilization during processing.
Initially, I was storing the processed data in an array (not saving the data at run time for each product), I read about at the stack forum that I need to save processed data, as the processed data was eating up all the memory, I changed the code accordingly changed map to imap, although it ran longer but still got the same error.
Here is my code, I am not posting the complete processing steps as it will only increase the length of code.
Point to note is there is huge amount of array data for each product when processed, each individual array up to 18000 elements.
I am using an octa-core processor with 16GB of ram and 500GB of ssd.
Any help would be appreciated. Thanks!
import xml.etree.cElementTree as ET
from lxml import html
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
import pathos.multiprocessing as mp
import multiprocessing
import ast
global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]
def processData(data):
vehicalData=[]
oemData=[]
appendIndex=0
#geting product link form incoming data list (tupile)
p=data[0][1]
#geting html source code form incoming data list(tupile)
#converting it to html element
source_code=html.fromstring(data[0][0])
#processing data
try:
firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
firstOem=firstOem[0].text_content().strip()
except:
firstOem=''
try:
name=source_code.xpath("//div[@id='right_title']/h1")
name=name[0].text_content().strip()
except:
name=''
#saving data in respective arrays
vehicalData.append([firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive])
for q in dayQtyPrice:
vehicalData[appendIndex].append(q)
vehicalData[appendIndex].append(specString)
vehicalData[appendIndex].append(subAssembltString)
vehicalData[appendIndex].append(parentAssemblyString)
vehicalData[appendIndex].append(otherProductString)
vehicalData[appendIndex].append(description)
vehicalData[appendIndex].append(placement)
for dma in makeModelArray:
vehicalData[appendIndex].append(dma)
oemData.append([firstOem,name,productType,brand,mfgNumber,p])
for o in oemArray:
oemData[appendIndex].append(o)
print('Done !',p,len(vehicalData[0]),len(oemData[0]))
#returning both arrays
return (vehicalData,oemData)
def main():
productLinks=[]
vehicalData=[]
oemData=[]
#opening text file for processing list data
with open('test.txt', encoding='utf-8') as f:
string=f.read()
sourceDataList=ast.literal_eval(string)
print('Number of products:',len(sourceDataList))
#creating pool and initiating multiprocessing
p = mp.Pool(4) # Pool tells how many at a time
#opening and saving data at run time
vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
vehicalOutSheet=vehicalOutBook.active
oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
oemOutSheet=oemOutBook.active
for d in p.imap(processData, sourceDataList):
v=d[0][0][:18000]
o=d[1][0][:18000]
vehicalOutSheet.append(v)
oemOutSheet.append(o)
p.terminate()
p.join()
#saving data
vehicalOutBook.save('vehical_data_file.xlsx')
oemOutBook.save('oem_data_file.xlsx')
if __name__=='__main__':
main()
I am not familiar with the pathos.multiprocessing.Pool
class, but let's assume it works more or less the same as the multiprocess.pool.Pool
class. The problem is that the data in test.txt
is in such a format that it appears that you must read the whole file in to parse it with ast.liter_eval
and therefore there can be no storage savings with imap
.
To use imap
(or imap_unordered
) efficiently, instead of storing in file test.txt
a representation (JSON
?) of a list
, store multiple product representations separated by a newline that can be individually parsed so that the file can be read and parsed line by line instead to yield individual products. You should have an approximate count of how many lines and thus how many tasks will need to be submitted to imap
. The reason for this is that when you have a large number of tasks, it will be more efficient to use something other than the default chunksize argument value of 1. I have included below a function to compute a chunksize value along the lines that the map
function would use. Also, it seems that your worker function processData
is using one level of nested lists more than necessary. I have also reverted to using the standard multiprocessing.pool.Pool
class since I know more or less how that works.
Note: I don't see where in processData
variables makeModelArray
and oemArray
are defined.
import xml.etree.cElementTree as ET
from lxml import html
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
#import pathos.multiprocessing as mp
import multiprocessing
import ast
global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]
def processData(data):
#geting product link form incoming data list (tupile)
p=data[0][1]
#geting html source code form incoming data list(tupile)
#converting it to html element
source_code=html.fromstring(data[0][0])
#processing data
try:
firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
firstOem=firstOem[0].text_content().strip()
except:
firstOem=''
try:
name=source_code.xpath("//div[@id='right_title']/h1")
name=name[0].text_content().strip()
except:
name=''
#saving data in respective arrays
vehicalData = [firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive]
for q in dayQtyPrice:
vehicalData,append(q)
vehicalData,append(specString)
vehicalData.append(subAssembltString)
vehicalData.append(parentAssemblyString)
vehicalData.append(otherProductString)
vehicalData.append(description)
vehicalData.append(placement)
for dma in makeModelArray:
vehicalData.append(dma)
oemData = [firstOem,name,productType,brand,mfgNumber,p]
for o in oemArray:
oemData.append(o)
#print('Done !',p,len(vehicalData),len(oemData))
#returning both arrays
return (vehicalData,oemData)
def generate_source_data_list():
#opening text file for processing list data
with open('test.txt', encoding='utf-8') as f:
for line in f:
# data for just one product:
yield ast.literal_eval(line)
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
def main():
#creating pool and initiating multiprocessing
# use pool size equal to number of cores you have:
pool_size = multiprocessing.cpu_count()
# Approximate number of elements generate_source_data_list() will yield:
NUM_TASKS = 100_000 # replace with actual number
p = multiprocessing.Pool(pool_size)
chunksize = compute_chunksize(NUM_TASKS, pool_size)
#opening and saving data at run time
vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
vehicalOutSheet=vehicalOutBook.active
oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
oemOutSheet=oemOutBook.active
for d in p.imap(processData, generate_source_data_list(), chunksize=chunksize):
v = d[0][:18000]
o = d[1][:18000]
vehicalOutSheet.append(v)
oemOutSheet.append(o)
p.terminate()
p.join()
#saving data
vehicalOutBook.save('vehical_data_file.xlsx')
oemOutBook.save('oem_data_file.xlsx')
if __name__=='__main__':
main()
You will still require a lot of storage for your final spreadsheet! Now if you were outputting two csv
files, that would be a different story -- you could be writing those as you went along.