Search code examples
pythoncsvbatch-processingconcurrent.futurestimeoutexception

Parallelizing, Multiprocessing, CSV writer


I have a huge list of strings called term_list that I process one-by-one in a function called run_mappers(). One of the args is a csv_writer object. I append results to a list called from_mapper in the function. I write that list to a csv file using the csv_writer object. In my scouring for help, I read that multiprocessing module is not recommended for csv writing because it it pickles and csv_writer objects can't be pickled (can't find reference for this now in my billion tabs open on my desktop). I am not sure if multiprocessing is best suited for my task anyway.

def run_mappers(individual_string, other_args, csv_writer):
   # long processing code goes here, ending up with processed_result 
   from_mapper.append(processed_result)
   csv_writer.writerow(processed_result)

I want to speed up processing of this huge list, but am trying to control for memory usage by splitting the list into batches to process (term_list_batch). So I try:

def parallelize_mappers(term_list_batch, other_args, csv_writer):
    
    future_to_term = {}
    terms_left = len(term_list_batch)

    with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
        future_to_term = {executor.submit(run_mappers, term_list_batch, other_args, csv_writer): term for term in term_list_batch}
        try:
            for future in concurrent.futures.as_completed(future_to_term, timeout=180): # timeout after 3 min
                term = future_to_term[future]
                try:
                    result = future.result()
                    # Process result if needed
                except Exception as exc:
                    print(f"Job {term} generated an exception: {exc}")
                finally:
                    terms_left -= 1
                    if terms_left % 10 == 0:
                        gc.collect()
                        time.sleep(2)
        except concurrent.futures.TimeoutError:
            print("Timeout occurred while processing futures")
            for key, future in future_to_term.items():
                if key not in results:
                    future.cancel()

When I get a Timeouterror, my process just hangs and I'm not sure what to do to keep moving forward in my huge term_list. I also don't want to terminate the program. I just want to keep moving through term_list, or process the next batch. If a thread fails or something, I just want to ignore the term or toss the whole thread and continue processing term_list to write as many results to the file as I can.

Amongst my many attempts to trouble-shoot, I tried something like this, but am posting the one above as my best shot since it crunched through a few hundred terms before stalling on me. Other tries I've had had just died, had some Runtime error, had threads deadlocking, etc.

For reference, another attempt is below:

def parallelize_mappers(term_list_batch, other_args, csv_writer):
    
    timeout = 120
    terminate_flag = threading.Event()

    # Create a thread for each term
    threads = []
    for term in term_list_batch:
        thread = threading.Thread(target=run_mappers, args=(term, other_args, csv_writer, terminate_flag))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete or timeout
    for thread in threads:
        thread.join(timeout)

        # If the thread is still alive, it has timed out
        if thread.is_alive():
            print("Thread {} timed out. Terminating...".format(thread.name))
            terminate_flag.set()  # Set the flag to terminate the thread

Then I added a while not terminate_flag.is_set() to the run_mappers() function before executing rest of processing code. But this is just unbearably slow. Thank you in advance.

Mock code to reproduce/term_list to process below:

term_list = ['Dementia',
 'HER2-positive Breast Cancer',
 'Stroke',
 'Hemiplegia',
 'Type 1 Diabetes',
 'Hypospadias',
 'IBD',
 'Eating',
 'Gastric Cancer',
 'Lung Cancer',
 'Carcinoid',
 'Lymphoma',
 'Psoriasis',
 'Fallopian Tube Cancer',
 'Endstage Renal Disease',
 'Healthy',
 'HRV',
 'Recurrent Small Lymphocytic Lymphoma',
 'Gastric Cancer Stage III',
 'Amputations',
 'Asthma',
 'Lymphoma',
 'Neuroblastoma',
 'Breast Cancer',
 'Healthy',
 'Asthma',
 'Carcinoma, Breast',
 'Fractures',
 'Psoriatic Arthritis',
 'ALS',
 'HIV',
 'Carcinoma of Unknown Primary',
 'Asthma',
 'Obesity',
 'Anxiety',
 'Myeloma',
 'Obesity',
 'Asthma',
 'Nursing',
 'Denture, Partial, Removable',
 'Dental Prosthesis Retention',
 'Obesity',
 'Ventricular Tachycardia',
 'Panic Disorder',
 'Schizophrenia',
 'Pain',
 'Smallpox',
 'Trauma',
 'Proteinuria',
 'Head and Neck Cancer',
 'C14',
 'Delirium',
 'Paraplegia',
 'Sarcoma',
 'Favism',
 'Cerebral Palsy',
 'Pain',
 'Signs and Symptoms, Digestive',
 'Cancer',
 'Obesity',
 'FHD',
 'Asthma',
 'Bipolar Disorder',
 'Healthy',
 'Ayerza Syndrome',
 'Obesity',
 'Healthy',
 'Focal Dystonia',
 'Colonoscopy',
 'ART',
 'Interstitial Lung Disease',
 'Schistosoma Mansoni',
 'IBD',
 'AIDS',
 'COVID-19',
 'Vaccines',
 'Beliefs',
 'SAH',
 'Gastroenteritis Escherichia Coli',
 'Immunisation',
 'Body Weight',
 'Nonalcoholic Steatohepatitis',
 'Nonalcoholic Fatty Liver Disease',
 'Prostate Cancer',
 'Covid19',
 'Sarcoma',
 'Stroke',
 'Liver Diseases',
 'Stage IV Prostate Cancer',
 'Measles',
 'Caregiver Burden',
 'Adherence, Treatment',
 'Fracture of Distal End of Radius',
 'Upper Limb Fracture',
 'Smallpox',
 'Sepsis',
 'Gonorrhea',
 'Respiratory Syncytial Virus Infections',
 'HPV',
 'Actinic Keratosis']

Solution

  • The way I see it, you want to parallel or multitask run_mappers() because this function might take a long time to finish. The CSV writing part does not need to be run in parallel because it is done relatively quick.

    The first step is to redesign run_mappers() NOT to take in as parameter a CSV writer. Instead, this function should return the processed_result. This function might raise an exception and we will ignore the result for that thread. To be useful, I will write the errors out to err.txt

    import csv
    import logging
    import random
    import time
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(asctime)s | %(levelname)s | %(message)s",
    )
    
    term_list = [
        "Dementia",
        # ... omitted for brevity
        "Actinic Keratosis",
    ]
    
    
    def run_mappers(individual_string, other_args):
        # Simulate long processing time to get processed_result
        time.sleep(random.randint(1, 2))
        processed_result = [individual_string.strip(), other_args]
    
        # Simulate an exception
        if random.randint(1, 20) == 5:
            logging.error("%r -> failed", individual_string)
            raise (ValueError(individual_string))
    
        logging.debug("%r -> %r", individual_string, processed_result)
        return processed_result
    
    
    def main():
        """Entry"""
        # run_mappers takes a long time, so this part is done in parallel
        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(run_mappers, term, "other-args")
                for term in term_list
            ]
    
        # Writing to CSV does not need to be done in parallel because
        # it is relatively quick
        logging.info("Writing to CSV")
        with open("out.csv", "w") as stream, open("err.txt", "w") as err:
            writer = csv.writer(stream)
            for future in futures:
                if future.exception():
                    err.write(f"{future.exception()}\n")
                else:
                    writer.writerow(future.result())
        logging.info("Done CSV")
    
    
    if __name__ == "__main__":
        main()
    

    Output

    2024-03-02 09:49:00,335 | DEBUG | 'HER2-positive Breast Cancer' -> ['HER2-positive Breast Cancer', 'other-args']
    2024-03-02 09:57:55,174 | ERROR | 'Breast Cancer' -> failed
    2024-03-02 09:49:11,366 | DEBUG | 'HPV' -> ['HPV', 'other-args']
    ...
    2024-03-02 09:49:11,377 | DEBUG | 'Sepsis' -> ['Sepsis', 'other-args']
    2024-03-02 09:49:11,377 | INFO | Writing to CSV
    2024-03-02 09:49:11,378 | INFO | Done CSV
    

    Notes

    • Run this script and if the results look alright, you can add your real run_mappers() code
    • I have no idea what other_args look like, so I fake it
    • You might want to replace ThreadPoolExecutor with ProcessPoolExecutor and compare the timing to see which solution works more efficently