I have (900k, 300) records on mongo collection.
When i am trying to read the data to pandas the memory consumption increase dramatically till the process is Killed.
I have to mention that the data is fit to memory(1.5GB~
) if i am reading it from csv file.
My machine is 32GB RAM and 16 CPU's Centos 7.
My simple code:
client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))
My multiprocessing code:
def read_mongo_parallel(skipses):
print("Starting process")
client = MongoClient(skipses[4],skipses[5])
db = client[skipses[2]]
collection = db[skipses[3]]
print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))
cursor = collection.find().skip(skipses[0]).limit(skipses[1])
return list(cursor)
all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for rows in executor.map(read_mongo_parallel, skipesess):
all_lists.extend(rows)
df = pd.DataFrame(all_lists)
The memory increase in both methods and kill the kernel,
What i am doing worng?
I have found a solution with multiprocessing and its is the fastest
def chunks(collection_size, n_cores=mp.cpu_count()):
""" Return chunks of tuples """
batch_size = round(collection_size/n_cores)
rest = collection_size%batch_size
cumulative = 0
for i in range(n_cores):
cumulative += batch_size
if i == n_cores-1:
yield (batch_size*i,cumulative+rest)
else:
yield (batch_size*i,cumulative)
def parallel_read(skipses,host=HOST, port=PORT):
print('Starting process on range of {} to {}'.format(skipses[0],skipses[1]))
client = MongoClient(host,port)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
cursor = collection.find({},{ '_id': False } )
_df = pd.DataFrame(list(cursor[skipses[0]:skipses[1]]))
return _df
def read_mongo(colc_size,_workers=mp.cpu_count()):
temp_df = pd.DataFrame()
pool = mp.Pool(processes=_workers)
results = [pool.apply_async(parallel_read, args=(chunk,)) for chunk in chunks(colc_size,n_cores=_workers)]
output = [p.get() for p in results]
temp_df = pd.concat(output)
return temp_df
time_0 = time()
df = read_mongo(get_collection_size())
print("Reading database with {} processes took {}".format(mp.cpu_count(),time()-time_0))
Starting process on range of 0 to 53866
Starting process on range of 323196 to 377062
Starting process on range of 430928 to 484794
Starting process on range of 538660 to 592526
Starting process on range of 377062 to 430928
Starting process on range of 700258 to 754124
Starting process on range of 53866 to 107732
Starting process on range of 484794 to 538660
Starting process on range of 592526 to 646392
Starting process on range of 646392 to 700258
Starting process on range of 215464 to 269330
Starting process on range of 754124 to 807990
Starting process on range of 807990 to 915714
Starting process on range of 107732 to 161598
Starting process on range of 161598 to 215464
Starting process on range of 269330 to 323196
Reading database with 16 processes took 142.64860558509827
With one of the examples above (no multiprocessing)
def iterator2dataframes(iterator, chunk_size: int):
"""Turn an iterator into multiple small pandas.DataFrame
This is a balance between memory and efficiency
"""
records = []
frames = []
for i, record in enumerate(iterator):
records.append(record)
if i % chunk_size == chunk_size - 1:
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
return pd.concat(frames)
time_0 = time()
cursor = collection.find()
chunk_size = 1000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))
Reading database with chunksize = 10000 took 372.1170778274536
time_0 = time()
cursor = collection.find()
chunk_size = 10000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))
Reading database with chunksize = 10000 took 367.02637577056885