Search code examples
pythonmultiprocessingpython-multiprocessingpython-multithreading

Python Pool.apply_async() is returning None type objects


I have a 4GB+ file in which each line represents a very nested JSON string. An example of what the file looks like would be below:

{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
100,000+ lines like above

I need to do the followings:

  • convert each line in the file to JSON object
  • flatten each JSON object so that all the key-value pairs are on the same level while filtering only a few keys that I need (I need ~100 keys out of a total of 700+ in each JSON object)

My plan is to divide this 100K plus lines into multiple chunks, and use multiprocessing to flatten the JSON objects in each chunk and combine them back into a dataframe. Because I am new to multiprocessing, I read a few posts including this and this (and a few more). I tried to write my code based on the first post like this:

import json
import multiprocessing
from multiprocessing import Pool, Process, Queue

import pandas as pd


# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}


def chunk_the_list(list_of_items, chunk_size):
    for i in range(0, len(list_of_items), chunk_size):
        yield list_of_items[i : i + chunk_size]


def do_work(list_of_dicts):
    results = []
    for d in list_of_dicts:
        results.append(flatten_dict(d))


def flatten_dict(d, parent_key="", sep="_"):
    # This function recursively dives into each JSON object and flattens them
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            if new_key in COLS_TO_KEEP:
                items.append((new_key, v))
    return dict(items)


def main():
    raw_data_file_path_name = 'path/to/file.txt' # file is 4GB+
    file = open(raw_data_file_path_name, encoding="utf-8").readlines()
    listify = [json.loads(line) for line in file]
    size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
    chunked_data = chunk_the_list(listify, size_of_each_chunk)

    p = Pool(processes=multiprocessing.cpu_count() - 2)
    results = [p.apply_async(do_work, args=(chunk,)) for chunk in chunked_data]

    results = [item.get() for item in results] # This list has None type objects when I check using debugger
    results = sum(results, []) # because the above line returns None values, this step errors out 

    # I want to create a Pandas dataframe using the accumulated JSON/dictionary objects from results and write it out as a parquet or csv file at the end like this https://stackoverflow.com/a/20638258/1330974
    df = pd.DataFrame(results)
    df.to_parquet('df.parquet.gzip', compression='gzip')

if __name__ == "__main__":
    start_time = time.time()
    main()

Am I doing something wrong to get None type objects back in the results set? Thank you in advance for suggestions and answers!


Solution

  • You need to add a return statement to your do_work() function.

    def do_work(list_of_dicts):
        results = []
        for d in list_of_dicts:
            results.append(flatten_dict(d))
        return results