Search code examples
pythonpandasparallel-processingtext-segmentation

How to implement parallel process on huge dataframe


Now, I have one huge dataframe "all_in_one",

all_in_one.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8271066 entries, 0 to 8271065
Data columns (total 3 columns):
label    int64
text     object
type     int64
dtypes: int64(2), object(1)
memory usage: 189.3+ MB
all_in_one.sample(2)

enter image description here

I need to run word segmentation on "text" column of this dataframe.

import jieba
import re

def jieba_cut(text):
    text_cut = list(filter(lambda x: re.match("\w", x),
                            jieba.cut(text)))
    return text_cut
%%time
all_in_one['seg_text'] = all_in_one.apply(lambda x:jieba_cut(x['text']),axis = 1)
CPU times: user 1h 18min 14s, sys: 55.3 s, total: 1h 19min 10s
Wall time: 1h 19min 10s

This process total consumed more than 1 hour. I want to parallel execute word segmentation on dataframe and reduce running time. please leave some message.

EDIT:

Amazing, when i used dask to implement the function above.

all_in_one_dd = dd.from_pandas(all_in_one, npartitions=10)
%%time
all_in_one_dd.head()
CPU times: user 4min 10s, sys: 2.98 s, total: 4min 13s
Wall time: 4min 13s

Solution

  • I'd suggest if you're working with pandas and want to work on some form of parallel processing, I invite you to use dask. It's a Python package that has the same API as pandas dataframes, so in your example, if you have a csv file called file.csv, you can do something like:

    You'll have to do some setup for a dask Client and tell it how many workers you want and how many cores to use.

    import dask.dataframe as dd
    from dask.distributed import Client
    import jieba
    
    def jieba_cut(text):
        text_cut = list(filter(lambda x: re.match("\w", x),
                                jieba.cut(text)))
        return text_cut
    
    client = Client() # by default, it creates the same no. of workers as cores on your local machine
    
    all_in_one = dd.read_csv('file.csv') # This has almost the same kwargs as a pandas.read_csv
    
    all_in_one = all_in_one.apply(jieba_cut) # This will create a process map
    
    all_in_one = all_in_one.compute() # This will execute all the processes
    

    Fun thing is you can actually access a dashboard to see all the processes done by dask (i think by default it's localhost:8787)