Search code examples
pythonjsonpandasdata-analysisdask

Need help removing None rows from a dask object for multiple large json files


Hi I am trying to flatten and parse only 7 out of many keys:value pairs from 30 nested large .json.gz files (4GB each before unzipping). I am trying doing the standard pandas and json combo but reading the rows one row at a time for 30 large json files didn't seem like the best idea so upon some searching I found dask that made this very fast. I am using this function I was able to flatten out and swiftly 7 fields I care about for each json object.

image of jupyter notebook code and along with the result

The goal is to grab all json objects that have key:screen and value:CAMERA. The problem is not all json objects in these files have key=screen. To override that key error for rows where there is no key=screen, I do something I know is not recommended; try and catch block. While this works, it just creates a bunch of None rows and there is no way to swiftly remove them. For more clarity, the None rows are created in places where there was nothing to flatten because that particular json object had no screen key. I want to find either a) a way to filter out and flatten json files based on key and not just value. OR *b)*Find a swift dropna() equivalent for dask bags so I can get rid of all these None rows in one command.

End goal is to do this for all files for march 2018 and compile them into one dataframe for further analysis.

I have tried using other commands and methods that don't involve using dask like the module ijson and the whole prefix, value thing(check commented out part of following code screenshot). None of the alternatives seem as swift and efficient as the dask.bag method given the size of my files. An ideal solution would be finding a way to get rid of the None rows issues.

import dask.bag as db
import json
import gzip
import pandas as pd
import ijson as ij
path="etl/mar2018/mongo-feedback-2018-03-05.json.gz"
# pd.read_json(path)
# b=ij.parse(path)
# print(b)

# def parse_json(json_filename):
#     with gzip.open(path,'rt', encoding='utf-8') as input_file:
# #     with open(json_filename, 'rb') 
#         # load json iteratively
#         parser = ijson.parse(input_file)
# #     return ij.items(parser)
#         for prefix, event, value in parser:
#             if prefix=='screen':
#                 print(value)
# #             print(prefix)
# #             print('prefix={}, event={}, value={}'.format(prefix, event, value))


# # if __name__ == '__main__':
# parse_json(path)
# b.head()
b=db.read_text(path).map(json.loads)
# b.to_dataframe()

# b=b.filter(lambda record: record['screen'])

# # for i in b:
# #     print(i)
# # 
def flatten(record):
        try:
            return{
                'userId': record['userId'],
                'expression':record['expression'],
                'platform':record['platform'],
                'country':record['country'],
                "date":record['date']['$date'],
                "cameraImageType":record['metadataMap']["cameraImageType"],
                "screen":record['screen']
            }
        except KeyError:
            pass
df=b.map(flatten)
df.take(5)


# df.remove(None)
def filter1(record):
    if record is None:
        return record

# df.to_textfiles()
# .to_dataframe()
# p=df.map(filter1)
# df1=df-p

# # df.compute()
# # df.notnull().take(10)

# # df.dropna(how='all')
# # df.head(50)
# # p=filter(None,df)
# # list(b.filter('screen'))
# # b.count().compute()
# p=df.to_csv()
# d=df.dropna()

# d.head()

# # b.filter(lambda record: record['screen']=='CAMERA').take(10)

(None, None, {'userId': 'foo', 'expression': 'bar', 'platform': 'IOS', 'country': 'GT', 'date': '2018-03-04T22:58:18.000Z', 'cameraImageType': 'typed', 'screen': 'CAMERA'}, None, None)enter image description here


Solution

  • If I understand correctly, your bag contains some elements with real data as dictionaries and some with just the value None, where the keys could not be found - you would like to keep only the real data.

    I would use the filter method:

    b2 = b.map(flatten).filter(bool)
    

    (or, to be more explicit, replace bool with lambda x: x is not None; this is a function which specifies whether you want to keep a given element or not)

    Then you can write this to files, or turn into a dataframe and perform further operations on it.