Search code examples
pysparkdata-sciencepython-multiprocessingdaskdistributed-computing

dask - Applying a function over a large dataframe which is more than RAM


It is believed that Dask framework is capable of handling datasets which are more than RAM in size. Nevertheless, I wasn't able to successfully apply it to my problem, which sounds like this:

I have a huge .csv file (1.8Gb), which contains texts of users' comments, and a RAM of 8Gb. The goal is to preprocess the given data (to tokenize sentences, at first). In order to achieve this, I run the following code:

if __name__ == '__main__':

client = Client(n_workers=3, memory_limit='1.5GB', processes=True)
df = dd.read_csv('texts_no_n', dtype={'user_id': int, 'post_id': int, 'text': str})

print('Tokenizing sents')
def tokenize(df):
    df['text'] = df.text.apply(lambda post: nltk.sent_tokenize(post, language='russian'))
    print('tokenized')
    return df

df = df.map_partitions(tokenize, meta=df)
df.compute()

Dask splits my dataframe on 20 partitions.

I expect Dask workers to do iteratively for each partition:

  1. Tokenize the text (to run tokenize(df_part)) and to return a new, preprocessed part of the given dataframe
  2. Release the memory, spent on reading the partition from the file. As it always do after executing the 'compute' method

After it has iterated through all the partitions, I expect Dask to concatenate all the preprocessed partitions and return a full preprocessed dataframe.

This behaviour seems logical and most memory-saving to me, even though the practice shows that Dask doesn't release the memory, before it has processed the whole dataframe.

After computing 12 partitions of 20, I run out of my RAM and Dask seems to be trying to dump the workers' data to the disk. Have a look at the output:

Tokenizing sents
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory?  Process memory: 1.05 GB -- Worker memory limit: 1.50 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 1.05 GB -- Worker memory limit: 1.50 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 979.51 MB -- Worker memory limit: 1.50 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

The scheduler restarts all the workers due to memory leak. A huge amount of RAM releases and the process of tokenizing starts all over again (it happens at the time of the steep RAM decline in the picture)

enter image description here

I suppose, when workers restart, they start their job from the beginning, otherwise my data preprocessing would eventually finish. Therefore, restarting the workers doesn't fit my needs.

After running the same process several times, the scheduler kills the workers and the code terminates.

My questions are:

1) Is there a possibility to multiprocessingly preprocess the Big Data with Dask or any other tool?

I could have managed this dataset of 1.8Gb with pandas' dataframe, using only one process, but I ask for educational purposes: What if my dataset exceeded my RAM? Let it be, for instance, 10Gb.

2) Why Dask's workers can't dump the data they computed for each partition to the disk in order to release the RAM?

Output shows that workers have no data to store, but it is not the truth, because my RAM is full of data. If a partition size is approximately 60 Mb (as it is in my case), can't Dask just dump some of the partitions ?

One more point to think about is the following:

Consider the case of 3 workers. If each worker processes approximately the same amount of data, it seems logical that for my case of 1.8Gb, the maximum of memory amount used by one process should be equal to around

1) 1.8Gb / 3 * 2 = 1.2Gb, and the desired is: 2) 1.8Gb / 3 = 600Mb

In the first case, I multiplied the result by 2, assuming that data spent by df = df.map_partitions(tokenize, meta=df) equals the amount of given data plus the amount of processed data (which is approximately the same, in my case). The second formula of data consuming is the one of my desired technique, outlined above (the way that I expect Dask to work).

The problem is that I don't possess such a huge RAM to accommodate the data, spent by the first formula.


Solution

  • Finally, I am able to answer my own question.

    As practice (and the documentation) showed, the best way to treat dask - is to use it with .parquet format. At first, I splitted my huge file into many .parquet files with df.to_parquet(dir_name), then loaded them back with dd.read_parquet(dir_name) and applied my function.

    The following code worked for me:

    def preprocess_df(df): # To pass to 'map_partition'
    
        mystem = Mystem()  # Most important to set it here! Don't pass objects as an argument  
        df['text'] = df['text'].apply(lambda x: pr.preprocess_post(x, mystem))
    
        mystem.close()
        return df
    
    if __name__ == '__main__':
        client = Client(n_workers=4)
    
        # Splitting the big file
        df = dd.read_csv('texts.csv', dtype={'user_id': int, 'post_id': int, 'text': str}) # Read a big data file
        df = df.repartition(npartitions=df.npartitions*8) # 8 migh be too high, try with lower values at first (e.g., 2 or don't repartition at all)
        df.to_parquet(dir_name) # convert .csv file to .parquet parts
    
        # Loading the splitted file parts
        df = dd.read_parquet(dir_name)
    
        # Applying the function 
        df = df.map_partitions(preprocess_df, meta={'user_id': int, 'post_id': int, 'text': object}) # Be sure not to '.compute' here
    
        df.to_parquet('preprocesed.parquet')
        client.close()
    

    RAM consumption didn't rise above 50%.

    I guess, it was not the .parquet format, that helped to reduce RAM consumption, but splitting the file into parts.

    Update: Be careful when passing objects (mystem) to the function (preprocess_df), on which 'map_partition' is applied, because it might result in an unexpected behaviour (cause all the workers will try to shape this object, which is not what we want in most of cases). If you need to pass additional 'multiprocessingly-problematic' objects, define them inside the function itself (like in the 3rd line: mystem = Mystem()).