Search code examples
pythonpython-2.7error-handlingmultiprocessingtruncated

Truncated file header while using multiprocessing


When I run the line:

def book_processing(pair, pool_length):
    p = Pool(len(pool_length)*3)
    temp_parameters = partial(book_call_mprocess, pair)
    p.map_async(temp_parameters, pool_length).get(999999)
    p.close()                                 
    p.join()
    return exchange_books

I get the following error:

Traceback (most recent call last):
  File "test_code.py", line 214, in <module>
    current_books = book_call.book_processing(cp, book_list)
  File "/home/user/Desktop/book_call.py", line 155, in book_processing
    p.map_async(temp_parameters, pool_length).get(999999)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
zipfile.BadZipfile: Truncated file header

I feel as though there is some resource that is being used that didn't close during the last loop, but I am not sure how to close it (still learning about multiprocessing library). This error only occurs when my code repeats this section relatively quickly (within the same minute). This does not happen often, but is clear when it does.

Edit (adding the book_call code):

def book_call_mprocess(currency_pair, ex_list):

    polo_error = 0
    live_error = 0
    kraken_error = 0
    gdax_error = 0

    ex_list = set([ex_list])

    ex_Polo = 'Polo'
    ex_Live = 'Live'
    ex_GDAX = 'GDAX'
    ex_Kraken = 'Kraken'

    cp_polo = 'BTC_ETH'
    cp_kraken = 'XETHXXBT'
    cp_live = 'ETH/BTC'
    cp_GDAX = 'ETH-BTC'

    # Instances
    polo_instance = poloapi.poloniex(polo_key, polo_secret)
    fookraken = krakenapi.API(kraken_key, kraken_secret)
    publicClient = GDAX.PublicClient()

    flag = False
    while not flag:
        flag = False
        err = False

        # Polo Book

        try:
            if ex_Polo in ex_list:
                polo_books = polo_instance.returnOrderBook(cp_polo)
                exchange_books['Polo'] = polo_books
        except:
            err = True
            polo_error = 1

        # Livecoin

        try:
            if ex_Live in ex_list:
                method = "/exchange/order_book"
                live_books = OrderedDict([('currencyPair', cp_live)])
                encoded_data = urllib.urlencode(live_books)
                sign = hmac.new(live_secret, msg=encoded_data, digestmod=hashlib.sha256).hexdigest().upper()
                headers = {"Api-key": live_key, "Sign": sign}
                conn = httplib.HTTPSConnection(server)
                conn.request("GET", method + '?' + encoded_data, '', headers)
                response = conn.getresponse()
                live_books = json.load(response)
                conn.close()
                exchange_books['Live'] = live_books
        except:
            err = True
            live_error = 1

        # Kraken

        try:
            if ex_Kraken in ex_list:
                kraken_books = fookraken.query_public('Depth', {'pair': cp_kraken})
                exchange_books['Kraken'] = kraken_books
        except:
            err = True
            kraken_error = 1

        # GDAX books

        try:
            if ex_GDAX in ex_list:
                gdax_books = publicClient.getProductOrderBook(level=2, product=cp_GDAX)
                exchange_books['GDAX'] = gdax_books
        except:
            err = True
            gdax_error = 1

        flag = True
        if err:
            flag = False
            err = False
            error_list = ['Polo', polo_error, 'Live', live_error, 'Kraken', kraken_error, 'GDAX', gdax_error]
            print_to_excel('excel/error_handler.xlsx', 'Book Call Errors', error_list)
            print "Holding..."
            time.sleep(30)
        return exchange_books


def print_to_excel(workbook, worksheet, data_list):
    ts = str(datetime.datetime.now()).split('.')[0]
    data_list = [ts] + data_list
    wb = load_workbook(workbook)
    if worksheet == 'active':
        ws = wb.active
    else:
        ws = wb[worksheet]
    ws.append(data_list)
    wb.save(workbook)

Solution

  • The problem lies in the function print_to_excel

    And more specifically in here:

    wb = load_workbook(workbook)
    

    If two processes are running this function at the same time, you'll run into the following race condition:

    • Process 1 wants to open error_handler.xlsx, since it doesn't exist it creates an empty file
    • Process 2 wants to open error_handler.xlsx, it does exist, so it tries to read it, but it is still empty. Since the xlsx format is just a zip file consisting of a bunch of XML files, the process expects a valid ZIP header which it doesn't find and it omits zipfile.BadZipfile: Truncated file header

    What looks strange though is your error message as in the call stack I would have expected to see print_to_excel and load_workbook.

    Anyway, Since you confirmed that the problem really is in the XLSX handling you can either

    • generate a new filename via tempfile for every process
    • use locking to ensure that only one process runs print_to_excel at a time