I'm trying to parallelize the reading the content of 16 gzip files with script:
import gzip
import glob
from dask import delayed
from dask.distributed import Client, LocalCluster
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
reads = [read.decode("utf-8") for read in reads]
return reads
if __name__ == "__main__":
cluster = LocalCluster()
client = Client(cluster)
read_files = glob.glob("*.txt.gz")
all_files = []
for file in read_files:
reads = get_gzip_delayed(file)
all_files.extend(reads)
with open("all_reads.txt", "w") as f:
w = delayed(all_files.writelines)(f)
w.compute()
However, I get the following error:
> TypeError: Delayed objects of unspecified length are not iterable
How do I parallelize a for loop with extend/append and writing the function to a doc. All dask examples always include some final function performed on for loop product.
List all_files
consists of delayed
values, and calling delayed(f.writelines)(all_files)
(note the different arguments relative to the code in the question) is not going to work for several reasons, the main is that you prepare lazy instructions for writing, but execute them only after closing the file.
There are different ways to solve this problem, at least two are:
all_files = dask.compute(all_files)
with open("all_reads.txt", "w") as f:
f.writelines(all_files)
get_gzip_delayed
function, so data doesn't travel between worker and client:from dask.distributed import Lock
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
# create a lock to prevent others from writing at the same time
with Lock("all_reads.txt"):
with open("all_reads.txt", "a") as f: # need to be careful here, since files are appending
f.writelines([read.decode("utf-8") for read in reads])
Note that if memory is a severe constraint, then the above can also be refactored to process the files line-by-line (at the cost of slower IO).