I have a 4GB+ file in which each line represents a very nested JSON string. An example of what the file looks like would be below:
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
100,000+ lines like above
I need to do the followings:
My plan is to divide this 100K plus lines into multiple chunks, and use multiprocessing
to flatten the JSON objects in each chunk and combine them back into a dataframe. Because I am new to multiprocessing
, I read a few posts including this and this (and a few more). I tried to write my code based on the first post like this:
import json
import multiprocessing
from multiprocessing import Pool, Process, Queue
import pandas as pd
# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}
def chunk_the_list(list_of_items, chunk_size):
for i in range(0, len(list_of_items), chunk_size):
yield list_of_items[i : i + chunk_size]
def do_work(list_of_dicts):
results = []
for d in list_of_dicts:
results.append(flatten_dict(d))
def flatten_dict(d, parent_key="", sep="_"):
# This function recursively dives into each JSON object and flattens them
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
if new_key in COLS_TO_KEEP:
items.append((new_key, v))
return dict(items)
def main():
raw_data_file_path_name = 'path/to/file.txt' # file is 4GB+
file = open(raw_data_file_path_name, encoding="utf-8").readlines()
listify = [json.loads(line) for line in file]
size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
chunked_data = chunk_the_list(listify, size_of_each_chunk)
p = Pool(processes=multiprocessing.cpu_count() - 2)
results = [p.apply_async(do_work, args=(chunk,)) for chunk in chunked_data]
results = [item.get() for item in results] # This list has None type objects when I check using debugger
results = sum(results, []) # because the above line returns None values, this step errors out
# I want to create a Pandas dataframe using the accumulated JSON/dictionary objects from results and write it out as a parquet or csv file at the end like this https://stackoverflow.com/a/20638258/1330974
df = pd.DataFrame(results)
df.to_parquet('df.parquet.gzip', compression='gzip')
if __name__ == "__main__":
start_time = time.time()
main()
Am I doing something wrong to get None
type objects back in the results set? Thank you in advance for suggestions and answers!
You need to add a return statement to your do_work()
function.
def do_work(list_of_dicts):
results = []
for d in list_of_dicts:
results.append(flatten_dict(d))
return results