I am trying to use dask
in order to split a huge tab-delimited file into smaller chunks on an AWS Batch array of 100,000 cores.
In AWS Batch each core has a unique environment variable AWS_BATCH_JOB_ARRAY_INDEX
ranging from 0 to 99,999 (which is copied into the idx
variable in the snippet below). Thus, I am trying to use the following code:
import os
import dask.dataframe as dd
idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])
df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv", sep='\t')
df = df.repartition(npartitions=100_000)
df = df.partitions[idx]
df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df.to_csv(f"/tmp/split_{idx}.tsv", sep="\t", index=False)
print(idx, df.shape, df.head(5))
Do I need to call presist
and/or compute
before calling df.to_csv
?
When I have to split a big file into multiple smaller ones, I simply run the following code.
import dask.dataframe as dd
df = dd.read_csv("file.csv")
df = df.repartition(npartitions=100)
o = df.to_csv("out_csv/part_*.csv", index=False)
o = df.to_parquet("out_parquet/")
Here you can use write_metadata_file=False
if you want to avoid metadata.
Few notes: