Search code examples
pythonpandaspymongo

OOM when reading data to Pandas from MongoDB using pymongo client


I have (900k, 300) records on mongo collection. When i am trying to read the data to pandas the memory consumption increase dramatically till the process is Killed. I have to mention that the data is fit to memory(1.5GB~) if i am reading it from csv file.

My machine is 32GB RAM and 16 CPU's Centos 7.

My simple code:

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))

My multiprocessing code:

def read_mongo_parallel(skipses):


    print("Starting process")
    client = MongoClient(skipses[4],skipses[5])
    db = client[skipses[2]]
    collection = db[skipses[3]]
    print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))

    cursor = collection.find().skip(skipses[0]).limit(skipses[1])

    return list(cursor)

all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for  rows in  executor.map(read_mongo_parallel, skipesess):
            all_lists.extend(rows)


df = pd.DataFrame(all_lists)   

The memory increase in both methods and kill the kernel,

What i am doing worng?


Solution

  • I have found a solution with multiprocessing and its is the fastest

    def chunks(collection_size, n_cores=mp.cpu_count()):
        """ Return chunks of tuples """
    
    
        batch_size = round(collection_size/n_cores)
        rest = collection_size%batch_size 
        cumulative = 0
        for i in range(n_cores):
            cumulative += batch_size
            if i == n_cores-1:
                yield (batch_size*i,cumulative+rest)
            else:
               yield (batch_size*i,cumulative)
    
    
    def parallel_read(skipses,host=HOST, port=PORT):
    
    
        print('Starting process on range of {} to {}'.format(skipses[0],skipses[1]))
        client = MongoClient(host,port)
        db = client[DB_NAME]
        collection = db[COLLECTION_NAME]
    
        cursor = collection.find({},{ '_id': False } )
        _df = pd.DataFrame(list(cursor[skipses[0]:skipses[1]]))
        return _df
    
    
    
    def read_mongo(colc_size,_workers=mp.cpu_count()):
        temp_df = pd.DataFrame()
        pool = mp.Pool(processes=_workers)
        results = [pool.apply_async(parallel_read, args=(chunk,))  for chunk in chunks(colc_size,n_cores=_workers)]
        output = [p.get() for p in results]
        temp_df = pd.concat(output)
        return temp_df
    
    
    time_0 = time()
    df = read_mongo(get_collection_size())
    print("Reading database with  {} processes took {}".format(mp.cpu_count(),time()-time_0))
    
    
    
    Starting process on range of 0 to 53866
    Starting process on range of 323196 to 377062
    Starting process on range of 430928 to 484794
    Starting process on range of 538660 to 592526
    Starting process on range of 377062 to 430928
    Starting process on range of 700258 to 754124
    Starting process on range of 53866 to 107732
    Starting process on range of 484794 to 538660
    Starting process on range of 592526 to 646392
    Starting process on range of 646392 to 700258
    Starting process on range of 215464 to 269330
    Starting process on range of 754124 to 807990
    Starting process on range of 807990 to 915714
    Starting process on range of 107732 to 161598
    Starting process on range of 161598 to 215464
    Starting process on range of 269330 to 323196
    

    Reading database with 16 processes took 142.64860558509827

    With one of the examples above (no multiprocessing)

    def iterator2dataframes(iterator, chunk_size: int):
      """Turn an iterator into multiple small pandas.DataFrame
    
      This is a balance between memory and efficiency
      """
      records = []
      frames = []
      for i, record in enumerate(iterator):
        records.append(record)
        if i % chunk_size == chunk_size - 1:
          frames.append(pd.DataFrame(records))
          records = []
      if records:
        frames.append(pd.DataFrame(records))
      return pd.concat(frames)
    
    time_0 = time()
    cursor = collection.find()
    chunk_size = 1000
    df = iterator2dataframes(cursor, chunk_size)
    print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))
    

    Reading database with chunksize = 10000 took 372.1170778274536

    time_0 = time()
    cursor = collection.find()
    chunk_size = 10000
    df = iterator2dataframes(cursor, chunk_size)
    print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))
    

    Reading database with chunksize = 10000 took 367.02637577056885