Search code examples
daskdask-distributeddask-delayed

Dask with tls connection can not end the program with to_parquet method


I am using dask to process 10 files which the size of each file is about 142MB. I build a method with delayed tag, following is an example:

@dask.delayed
def process_one_file(input_file_path, save_path):
   res = []
   for line in open(input_file_path):
       res.append(line)
   df = pd.DataFrame(line)
   df.to_parquet(save_path+os.path.basename(input_file_path))

if __name__ == '__main__':
    client = ClusterClient()
    input_dir = ""
    save_dir = ""
    print("start to process")
    cvss = [process_one_file(input_dir+filename, save_dir) for filename in os.listdir(input_dir)]
    dask.compute(csvs)

However, dask does not always run successfully. After processing all files, the program often hangs.
I used the command line to run the program. The program often huangs after printing start to process. I know the program runs correctly, since I can see all output files after a while. But the program never stops. If I disabled tls, the program can run successfully. It was so strange that dask can not stop the program if I enable tls connection. How can I solve it?


I found that if I add to_parquet method, then the program cannot stop, while if I remove the method, it runs successfully.


Solution

  • I have found the problem. I set 10GB for each process. That means I set memory-limit=10GB. I totally set 2 workers and each has 2 processes. Each process has 2 threads. Thus, each machine will have 4 processes which occupy 40GB. However, my machine only have 32GB. If I lower the memory limit, then the program will run successfully!