Search code examples
pythonlockingdaskdask-distributeddask-delayed

Parallel computing for loop with no last function


I'm trying to parallelize the reading the content of 16 gzip files with script:

import gzip
import glob

from dask import delayed
from dask.distributed import Client, LocalCluster

@delayed
def get_gzip_delayed(gzip_file):
    with gzip.open(gzip_file) as f:
        reads = f.readlines()
    reads = [read.decode("utf-8") for read in reads]
    return reads

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    read_files = glob.glob("*.txt.gz")
    all_files = []
    for file in read_files:
        reads = get_gzip_delayed(file)
        all_files.extend(reads)

    with open("all_reads.txt", "w") as f:
        w = delayed(all_files.writelines)(f)
    
    w.compute()

However, I get the following error:

> TypeError: Delayed objects of unspecified length are not iterable

How do I parallelize a for loop with extend/append and writing the function to a doc. All dask examples always include some final function performed on for loop product.


Solution

  • List all_files consists of delayed values, and calling delayed(f.writelines)(all_files) (note the different arguments relative to the code in the question) is not going to work for several reasons, the main is that you prepare lazy instructions for writing, but execute them only after closing the file.

    There are different ways to solve this problem, at least two are:

    • if the data from the files fits into memory, then it's easiest to compute it and write to the file:
    all_files = dask.compute(all_files)
    with open("all_reads.txt", "w") as f:
        f.writelines(all_files)
    
    • if the data cannot fit into memory, then another option is to put the writing inside the get_gzip_delayed function, so data doesn't travel between worker and client:
    from dask.distributed import Lock
    
    @delayed
    def get_gzip_delayed(gzip_file):
        with gzip.open(gzip_file) as f:
            reads = f.readlines()
    
        # create a lock to prevent others from writing at the same time
        with Lock("all_reads.txt"):
            with open("all_reads.txt", "a") as f: # need to be careful here, since files are appending
                f.writelines([read.decode("utf-8") for read in reads])
    

    Note that if memory is a severe constraint, then the above can also be refactored to process the files line-by-line (at the cost of slower IO).