Search code examples
pythoncsvdaskaws-batch

How to split a large .csv file using dask?


I am trying to use dask in order to split a huge tab-delimited file into smaller chunks on an AWS Batch array of 100,000 cores.

In AWS Batch each core has a unique environment variable AWS_BATCH_JOB_ARRAY_INDEX ranging from 0 to 99,999 (which is copied into the idx variable in the snippet below). Thus, I am trying to use the following code:

import os
import dask.dataframe as dd

idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])

df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv", sep='\t')
df = df.repartition(npartitions=100_000)
df = df.partitions[idx]

df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df.to_csv(f"/tmp/split_{idx}.tsv", sep="\t", index=False)
print(idx, df.shape, df.head(5))

Do I need to call presist and/or compute before calling df.to_csv?


Solution

  • When I have to split a big file into multiple smaller ones, I simply run the following code.

    Read and repartition

    import dask.dataframe as dd
    
    df = dd.read_csv("file.csv")
    df = df.repartition(npartitions=100)
    

    Save to csv

    o = df.to_csv("out_csv/part_*.csv", index=False)
    

    Save to parquet

    o = df.to_parquet("out_parquet/")
    

    Here you can use write_metadata_file=False if you want to avoid metadata.

    Few notes:

    • I don't think you really need persist and compute as you can directly save to disk. When you have problems like memory error is safer to save to disk rather than compute.
    • I found using parquet format at least 3x faster than csv when it's time to write.