Search code examples
daskhtcondor

How to Handle Individual Worker Failures in Dask When Running Simulations on an HTCondor Cluster?


I am using Dask to run simulations on an HTCondor cluster. The problem is that when one of the simulations fails, the rest of the workers are shut down. Here is the code where I launched the simulation with Dask (I removed the parts that are not important for this issue):

def simulator_dask(
    args_dict: dict,
    prior: DirectPosterior,
    dataset: DatasetMultichannelArray,
    device: torch.device,
) -> None:

    """
    Execute simulations based on the provided prior distribution in a multithreaded manner using the `Dask` package.

    Args:
        args_dict (Dictionary): Dictionary with the arguments.
        prior (DirectPosterior): Prior distribution.
        dataset (DatasetMultichannelArray): Stores statistics and scaling information used in the prior distribution.
        device (torch.device): Device used to run the script.

    Returns:
        None
    """
    # Create a list to hold delayed computations for each simulation.
    delayed_simulations = []
    run_simulation_delayed = dask.delayed(run_simulation_dask)

    for s in parameter_sets_gen:
        
        # Setting output folder path for each simulation.
        # Note that the numbering of the folders is limited to 6 digits here,
        # i.e., we can only generate simulations below 10 million.
        folder_name = f"{simulation_number:06}"
        
        simulation_args = argparse.Namespace(
            output_dir=folder_name,
        )

        # Create delayed computation for each simulation.
        delayed_simulations.append(
            run_simulation_delayed(
                simulation_args,
            )
        )


    log.info("")
    log.info("***************************************************************")
    log.info("Launching simulations")
    log.info("***************************************************************")

    # Compute the delayed computations, i.e., run the simulations in parallel with HTCondor.
    dask.compute(delayed_simulations)

The problem is that the run_simulation_dask function fails for some workers because the connection between my home directory and the node is unstable. This issue occurs when I try to transfer the output folder from the node to my home directory using shutil.copytree(), and the home directory cannot be found. This instability is particularly common with older nodes. When this happens, all other workers are killed. Is there a way to prevent all workers from stopping and instead rerun the failed run_simulation_dask on another available worker or retry it on the same one?


Solution

  • When computing tasks using the distributed Scheduler, you should be able to use the retries kwarg, which allows you to specify the number of time a task is retried before being failed and all other tasks currently processing cancelled.

    See https://distributed.dask.org/en/stable/api.html.