Search code examples
performanceparallel-processinglow-latencylarge-data-volumesprocess-pool

Large dataset, ProcessPoolExecutor issues


PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm

Learned enough about python to copy and/or write a program that works. each file takes ~40 seconds to load->filter->write. I have ~6,800 files to work through and want a better version which uses all my processing power (6 cores), I tried to write that version (below). Said version produces, however slightly slower than my original function:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer

decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers

def load_decode_filter(file):
    '''`
     Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open(file, 'rb') as ins:
        for bufr_message in generate_bufr_message(
                decoder,ins.read()):
            input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
            mask = [obj for obj in input_list if ((PHI_MAX > obj[
                12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
            output_message.extend(mask)
        return output_message

def main(files_in):
    '''
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor(max_workers=6) as executor:
        with tqdm(range(len(files_in)), desc='files loaded',
                  position=0) as progress:
            futures = []
            for file in files_in:
                future = executor.submit(load_decode_filter(file), file)
                future.add_done_callback(lambda p: progress.update())
                futures.append(future)
            results = []
            for future in futures:
                result = future.result()
                results.append(result)
    with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
        dump(results, f_o)

if __name__ == '__main__':
    main(files)

I was hoping to at least cut processing time per file.


Update, Closing:
First of all, I'd like to thank everyone who commented as well as the answerer (I'm too new to upvote). Seems like the only way to meaningfully increase efficiency would be to never decode in the first place and take what I want from in-situ bufr data, this is simply beyond my current ability (it is my first exposure to code of any kind).


I plan to (am currently) running my initial version (f.bufr in, f.bufr_.txt out) as I am able, I'll move processed files to subdirectory after each "run". Silver lining is I've learned enough doing this that I'll be able to make a program to combine all text output into one file. Thanks again.


Solution

  • Q :
    " PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm "

    A :
    No,
    with all respect,
    your main problem is not the efficiency of ProcessPoolExecutor()-instance, but
    your main problem is choosing performance / efficiency ( almost ) anti-patterns, which Python, the more Python-sub-processes in realms of Windows O/S will awfully punish with having you to wait for some 75 hours to collect all results (if the processing-pipeline does indeed what you expect it to do, which I cannot judge, but guess it will not ... for reasons listed below )

    SUSPECT #1 :
    best avoid 75 hours of producing nonsensical outputs :

    Given the documented standard Py3 concurrent.futures.Executor()-instance .submit()-method's call-signature, your code does not meet this specification.

    Instead of passing a reference to a function, the main(), being a calling-side, first performs for each and every of 6800 files a full, pure-[SERIAL] METOP-workpackage processing ( which produces some expensively collected huge list-of-messages ), which is then ( to the contrary of the documented requirement to pass a reference to a function / in-place lambda-operator ) again at awfully immense RAM/CPU/TIME expenses, SER/sent/DES-transferred to one of the Executor-managed pool of worker-processes ( which I doubt will be able to do anything reasonable upon receiving a list, instead of a function ( planned to be executed in such a remote process, over parameters delivered thereto - as per the calling-signature specifies ). Ouch...

    def main( files_in ):
        '''                                                                 __doc__
        attempt to intiate all cores in loading and filter bufr files
        '''
        with ProcessPoolExecutor( max_workers = 6
                                  )  as executor: #---------------------------# eXe CONTEXT-mgr
            
            with tqdm( range( len( files_in ) ),
                       desc     = 'files loaded',
                       position = 0
                       ) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr
                
                futures = []
                for file in files_in: #---------------------------------------#     LUXURY of top-level iterator, commanding  6800x times a pool of workers
                    future = executor.submit( load_decode_filter( file ), #---#     ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
                                                                  file    #---#         std PARA
                                                                  )
                    future.add_done_callback( lambda p: progress.update() )   #     LUXURY of tdqm() for showing 75-hours of work ???
                    futures.append( future ) #--------------------------------#     LUXURY of no performance gain
                
                results = []
                for future in futures:
                    result = future.result()
                    results.append( result ) #--------------------------------#     LUXURY of adverse performance gain
        
        with open( DIRECTORY + 'bufrout.json', 'w',
                   encoding = 'utf-8'
                   ) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
            dump( results, f_o )
    
    

    SUSPECT #2 :
    better avoid any & all performance-degrading syntax-constructors,
    if performance is the real goal to be achieved :

    Avoid any and all sins of typing a kind of low-hanging-fruits SLOC-s, which seem "sexy", but having been paid by immense add-on overhead costs.

    Design process-flow such that we may improve End-to-End processing times by latency-masking, where possible ( file-I/O being a classical case ) and avoiding any reducible steps at all ( creation of named-variables (sometimes never used ) is similar sin ).

    Given you run inside Windows O/S, your ( tho' hidden ) sub-process-instantiation costs are the highest of all other cases - Windows O/S will be spawning a full top-down copy of the Python interpreter-process, with all data-structures et al, so if that causes your physical RAM to get "over-crowded", the O/S will start ( for the rest of those 75 hours ... ) a nasty war of thrashing Virtual-Memory-managed file-I/O-transfers ( ~ 10.000x bigger latency ) from-RAM-to-disk & from-disk-to-RAM. That will efficiently damage any other CPU-from/to-RAM I/O-operations and we may straight forget any dreams about increasing performance.

    From pybufrkit promises, there is one more chance - getting 10% ~ 30% performance boost - if your "filter" is compilable using pybufrkit-templates :

    "(...) BUFR Template Compilation
    The main purpose of Template Compilation is performance. However since bit operations are the most time consuming part in the overall processing. The performance gain somewhat is limited. Depending on the total number of descriptors to be processed for a message, template compilation provides 10 - 30% performance boost. Read the Docs "

    As-was, entropy-reduced code :

    def load_decode_filter( file ):
        '''`
        Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
        '''
        output_message = []
        with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
            for idx,         \
                bufr_message  \
                in             \
                enumerate( generate_bufr_message( decoder,                   #     LUXURY of enumerate for no real use
                                                  ins.read() # <-------------# ins.
                                                  )
                           ):
                input_list = FlatJsonRenderer().render( bufr_message )[3][2] #     LUXURY of JSON-(re)-)decorations
                mask = [ obj for obj in input_list                           #
                                     if ( (    PHI_MAX > obj[12] >    PHI_MIN )
                                        & ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
                                          )
                         ]
                output_message.extend( mask )
            return output_message
    

    Performance tips, if neither managed to use the pybufrkit native compiled-templates nor native-scripted CLI tasking of pybufrkit and resort to Win/Py3 flow of processing :

    • given the anyway paid costs of full top-bottom copies of main-Python interpreter process, your workers shall "know" the list-of-all-files, so this embarrasingly independent file-by-file process will do best to :

    • gc.collect(); gc.disable() before spawning any pool of workers

    • spawn as few max_workers worker-processes as CPU-RAM physical memory-I/O-channels are present on your host hardware ( the tasks are memory-bound, not CPU )

    • split, on the main()-side the list-of-files to process - using max_workers-many, balanced-length, non-overlapping tuples of ( from_fileIDX, to_fileIDX )

    • executor.submit() a block-processing function-reference, with a single tuple of ( from_, to_ ) and arrange all the rest inside such block-processing function, including the latency-masked file-I/O storage of results ( possible to later merge, using O/S text/binary-file merging )

    • prefer latency-masking flows, using syntax-sugar(ed) iterators might be nice in school-book examples, but here these are ( un-maskable ) performance killers - collecting a huge-list of [ obj for obj in ... if ... ] is never to improve stream-alike ( maskable latency ) process-flow, without first collecting such a huge-list, just to next (re)-iterate such a huge-list to file-I/O such list's items one by one onto disk-file. Better iterate/filter/conditionally execute file-I/O-ops in one, single stream-of-steps ( reducing RAM, avoiding add-on overheads & all that with maskable latencies )

    For more details you may like to read this and code from this and there directed examples.