Search code examples
pythonpython-3.xpython-multiprocessingpathos

BrokenPipeError: [Errno 32] Python Multiprocessing


I was working on a web scraping project, but it was taking a lot of time processing the data, I came up with an alternate route to scrape the source code of products being scraped and then process data separately.

What I did is, stored the source code of each product enclosed separately within a tuple in an array and saved that array data in a text file for further processing at a later stage. I save data as chunks of 10,000 products. Each text file is about 10GB.

When I started to process data using multiprocessing I kept coming across the BrokenPipeError: [Error 32], Initially I was processing data on a windows machine, I explored a bit found that Linux is better at managing memory and this error is because of complete memory utilization during processing.

Initially, I was storing the processed data in an array (not saving the data at run time for each product), I read about at the stack forum that I need to save processed data, as the processed data was eating up all the memory, I changed the code accordingly changed map to imap, although it ran longer but still got the same error.

Here is my code, I am not posting the complete processing steps as it will only increase the length of code.

Point to note is there is huge amount of array data for each product when processed, each individual array up to 18000 elements.

I am using an octa-core processor with 16GB of ram and 500GB of ssd.

Any help would be appreciated. Thanks!

import xml.etree.cElementTree as ET
from lxml import html  
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
import pathos.multiprocessing as mp
import multiprocessing
import ast

global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]


def processData(data):

    vehicalData=[]
    oemData=[]
    appendIndex=0

    #geting product link form incoming data list (tupile)
    p=data[0][1]
    #geting html source code form incoming data list(tupile)
    #converting it to html element
    source_code=html.fromstring(data[0][0])

    #processing data
    try:
        firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
        firstOem=firstOem[0].text_content().strip()
    except:
        firstOem=''
    try:
        name=source_code.xpath("//div[@id='right_title']/h1")
        name=name[0].text_content().strip()
    except:
        name=''

    #saving data in respective arrays
    vehicalData.append([firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive])
    for q in dayQtyPrice:
        vehicalData[appendIndex].append(q)
    vehicalData[appendIndex].append(specString)
    vehicalData[appendIndex].append(subAssembltString)
    vehicalData[appendIndex].append(parentAssemblyString)
    vehicalData[appendIndex].append(otherProductString)
    vehicalData[appendIndex].append(description)
    vehicalData[appendIndex].append(placement)
    for dma in makeModelArray:
        vehicalData[appendIndex].append(dma)        
    oemData.append([firstOem,name,productType,brand,mfgNumber,p])   
    for o in oemArray:
        oemData[appendIndex].append(o)

    print('Done !',p,len(vehicalData[0]),len(oemData[0]))

    #returning both arrays
    return (vehicalData,oemData)

def main():
    productLinks=[]
    vehicalData=[]
    oemData=[]
    
    #opening text file for processing list data
    with open('test.txt', encoding='utf-8') as f:
        string=f.read()

    sourceDataList=ast.literal_eval(string)
    print('Number of products:',len(sourceDataList))

    #creating pool and initiating multiprocessing
    p = mp.Pool(4)  # Pool tells how many at a time

    #opening and saving data at run time
    vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
    vehicalOutSheet=vehicalOutBook.active
    oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
    oemOutSheet=oemOutBook.active
    for d in p.imap(processData, sourceDataList):
        v=d[0][0][:18000]
        o=d[1][0][:18000]
        vehicalOutSheet.append(v)
        oemOutSheet.append(o)

    p.terminate()
    p.join()

    #saving data
    vehicalOutBook.save('vehical_data_file.xlsx')
    oemOutBook.save('oem_data_file.xlsx')

if __name__=='__main__':
    main()

Solution

  • I am not familiar with the pathos.multiprocessing.Pool class, but let's assume it works more or less the same as the multiprocess.pool.Pool class. The problem is that the data in test.txt is in such a format that it appears that you must read the whole file in to parse it with ast.liter_eval and therefore there can be no storage savings with imap.

    To use imap (or imap_unordered) efficiently, instead of storing in file test.txt a representation (JSON?) of a list, store multiple product representations separated by a newline that can be individually parsed so that the file can be read and parsed line by line instead to yield individual products. You should have an approximate count of how many lines and thus how many tasks will need to be submitted to imap. The reason for this is that when you have a large number of tasks, it will be more efficient to use something other than the default chunksize argument value of 1. I have included below a function to compute a chunksize value along the lines that the map function would use. Also, it seems that your worker function processData is using one level of nested lists more than necessary. I have also reverted to using the standard multiprocessing.pool.Pool class since I know more or less how that works.

    Note: I don't see where in processData variables makeModelArray and oemArray are defined.

    import xml.etree.cElementTree as ET
    from lxml import html
    import openpyxl
    from openpyxl import Workbook
    from lxml import etree
    from lxml.etree import tostring
    #import pathos.multiprocessing as mp
    import multiprocessing
    import ast
    
    global sourceDataList
    sourceDataList=[]
    global trackIndex
    trackIndex=1
    global failList
    failList=[]
    
    
    def processData(data):
    
        #geting product link form incoming data list (tupile)
        p=data[0][1]
        #geting html source code form incoming data list(tupile)
        #converting it to html element
        source_code=html.fromstring(data[0][0])
    
        #processing data
        try:
            firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
            firstOem=firstOem[0].text_content().strip()
        except:
            firstOem=''
        try:
            name=source_code.xpath("//div[@id='right_title']/h1")
            name=name[0].text_content().strip()
        except:
            name=''
    
        #saving data in respective arrays
        vehicalData = [firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive]
        for q in dayQtyPrice:
            vehicalData,append(q)
        vehicalData,append(specString)
        vehicalData.append(subAssembltString)
        vehicalData.append(parentAssemblyString)
        vehicalData.append(otherProductString)
        vehicalData.append(description)
        vehicalData.append(placement)
        for dma in makeModelArray:
            vehicalData.append(dma)
        oemData = [firstOem,name,productType,brand,mfgNumber,p]
        for o in oemArray:
            oemData.append(o)
    
        #print('Done !',p,len(vehicalData),len(oemData))
    
        #returning both arrays
        return (vehicalData,oemData)
    
    def generate_source_data_list():
        #opening text file for processing list data
        with open('test.txt', encoding='utf-8') as f:
            for line in f:
                # data for just one product:
                yield ast.literal_eval(line)
    
    def compute_chunksize(iterable_size, pool_size):
        chunksize, remainder = divmod(iterable_size, 4 * pool_size)
        if remainder:
            chunksize += 1
        return chunksize
    
    
    def main():
        #creating pool and initiating multiprocessing
        # use pool size equal to number of cores you have:
        pool_size = multiprocessing.cpu_count()
        # Approximate number of elements generate_source_data_list() will yield:
        NUM_TASKS = 100_000 # replace with actual number
        p = multiprocessing.Pool(pool_size)
        chunksize = compute_chunksize(NUM_TASKS, pool_size)
    
        #opening and saving data at run time
        vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
        vehicalOutSheet=vehicalOutBook.active
        oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
        oemOutSheet=oemOutBook.active
        for d in p.imap(processData, generate_source_data_list(), chunksize=chunksize):
            v = d[0][:18000]
            o = d[1][:18000]
            vehicalOutSheet.append(v)
            oemOutSheet.append(o)
    
        p.terminate()
        p.join()
    
        #saving data
        vehicalOutBook.save('vehical_data_file.xlsx')
        oemOutBook.save('oem_data_file.xlsx')
    
    if __name__=='__main__':
        main()
    

    You will still require a lot of storage for your final spreadsheet! Now if you were outputting two csv files, that would be a different story -- you could be writing those as you went along.