Search code examples
pythonsortingpysparkdasklarge-files

Get unique of a column ordered by date in 800million rows


Input : Multiple csv with the same columns (800 million rows) [Time Stamp, User ID, Col1, Col2, Col3]

Memory available : 60GB of RAM and 24 core CPU

Input Output example

Problem : I want to group by User ID, sort by TimeStamp and take a unique of Col1 but dropping duplicates while retaining the order based on the TimeStamp.

Solutions Tried :

  1. Tried using joblib to load csv in parallel and use pandas to sort and write to csv (Get an error at the sorting step)
  2. Used dask (New to Dask); \
LocalCluster(dashboard_address=f':{port}', n_workers=4, threads_per_worker=4, memory_limit='7GB') ## Cannot use the full 60 gigs as there are others on the server           
ddf = read_csv("/path/*.csv")                                
ddf = ddf.set_index("Time Stamp")                                        
ddf.to_csv("/outdir/")

Questions :

  1. Assuming dask will use disk to sort and write the multipart output, will it preserve the order after I read the output using read_csv?
  2. How do I achieve the 2 part of the problem in dask. In pandas, I'd just apply and gather results in a new dataframe?
def getUnique(user_group):  ## assuming the rows for each user are sorted by timestamp
  res = list()
  for val in user_group["Col1"]:
    if val not in res:
      res.append(val)
  return res

Please direct me if there is a better alternative to dask.


Solution

  • So, I think I would approach this with two passes. In the first pass, I would look to run though all the csv files and build a data structure to hold the keys of user_id and col1 and the "best" timestamp. In this case, "best" will be the lowest.

    Note: the use of dictionaries here only serves to clarify what we are attempting to do and if performance or memory was an issue, I would first look to reimplement without them where possible.

    so, starting with CSV data like:

    [
        {"user_id": 1, "col1": "a", "timestamp": 1},
        {"user_id": 1, "col1": "a", "timestamp": 2},
        {"user_id": 1, "col1": "b", "timestamp": 4},
        {"user_id": 1, "col1": "c", "timestamp": 3},
    ]
    

    After processing all the csv files I hope to have an interim representation of:

    {
        1: {'a': 1, 'b': 4, 'c': 3}
    }
    

    Note that a representation like this could be created in parallel for each CSV and then re-distilled into a final interim representation via a pass 1.5 if you wanted to do that.

    Now we can create a final representation based on the keys of this nested structure sorted by the inner most value. Giving us:

    [
        {'user_id': 1, 'col1': ['a', 'c', 'b']}
    ]
    

    Here is how I might first approach this task before tweaking things for performance.

    import csv
    
    all_csv_files = [
        "some.csv",
        "bunch.csv",
        "of.csv",
        "files.csv",
    ]
    
    data = {}
    for csv_file in all_csv_files:
        #with open(csv_file, "r") as file_in:
        #    rows = csv.DictReader(file_in)
    
        ## ----------------------------
        ## demo data
        ## ----------------------------
        rows = [
            {"user_id": 1, "col1": "a", "timestamp": 1},
            {"user_id": 1, "col1": "a", "timestamp": 2},
            {"user_id": 1, "col1": "b", "timestamp": 4},
            {"user_id": 1, "col1": "c", "timestamp": 3},
        ]
        ## ----------------------------
    
        ## ----------------------------
        ## First pass to determine the "best" timestamp
        ## for a user_id/col1
        ## ----------------------------
        for row in rows:
            user_id = row['user_id']
            col1 = row['col1']
            ts_new = row['timestamp']
            ts_old = (
                data
                    .setdefault(user_id, {})
                    .setdefault(col1, ts_new)
            )
    
            if ts_new < ts_old:
                data[user_id][col1] = ts_new
        ## ----------------------------
    
    print(data)
    
    ## ----------------------------
    ## second pass to set order of col1 for a given user_id
    ## ----------------------------
    data_out = [
        {
            "user_id": outer_key,
            "col1": [
                inner_kvp[0]
                for inner_kvp
                in sorted(outer_value.items(), key=lambda v: v[1])
            ]
        }
        for outer_key, outer_value
        in data.items() 
    ]
    ## ----------------------------
    
    print(data_out)