Search code examples
memory-managementdaskparquetpartitiondask-dataframe

Why does Dask's map_partitions function use more memory than looping over partitions?


I have a parquet file of position data for vehicles that is indexed by vehicle ID and sorted by timestamp. I want to read the parquet file, do some calculations on each partition (not aggregations) and then write the output directly to a new parquet file of similar size.

I organized my data and wrote my code (below) to use Dask's map_partitions, as I understood this would perform the operations one partition at a time, saving each result to disk sequentially and thereby minimizing memory usage. I was surprised to find that this was exceeding my available memory and I found that if I instead create a loop that runs my code on a single partition at a time and appends the output to the new parquet file (see second code block below), it easily fits within memory.

Is there something incorrect in the original way I used map_partitions? If not, why does it use so much more memory? What is the proper, most efficient way of achieving what I want?

Thanks in advance for any insight!!

Original (memory hungry) code:

ddf = dd.read_parquet(input_file)
meta_dict = ddf.dtypes.to_dict()

(
    ddf
    .map_partitions(my_function, meta = meta_dict)
    .to_parquet(
        output_file,
        append = False,
        overwrite = True,
        engine = 'fastparquet'
    )
)

Awkward looped (but more memory friendly) code:

ddf = dd.read_parquet(input_file)

for partition in range(0, ddf.npartitions, 1):
    partition_df = ddf.partitions[partition]
    (
        my_function(partition_df)
        .to_parquet(
            output_file,
            append = True,
            overwrite = False,
            engine = 'fastparquet'
        )
    )

More hardware and data details: The total input parquet file is around 5GB and is split into 11 partitions of up to 900MB. It is indexed by ID with divisions so I can do vehicle grouped operations without working across partitions. The laptop I'm using has 16GB RAM and 19GB swap. The original code uses all of both, while the looped version fits within RAM.


Solution

  • As @MichaelDelgado pointed out, by default Dask will spin up multiple workers/threads according to what is available on the machine. With the size of the partitions I have, this maxes out the available memory when using the map_partitions approach. In order to avoid this, I limited the number of workers and the number of threads per worker to prevent automatic parellelization using the code below, and the task fit in memory.

    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(
        n_workers = 1,
        threads_per_worker = 1)
    client = Client(cluster)