Search code examples
pythonmultithreadinggpuoptuna

Is there a way to pass arguments to multiple jobs in optuna?


I am trying to use optuna for searching hyper parameter spaces.

In one particular scenario I train a model on a machine with a few GPUs. The model and batch size allows me to run 1 training per 1 GPU. So, ideally I would like to let optuna spread all trials across the available GPUs so that there is always 1 trial running on each GPU.

In the docs it says, I should just start one process per GPU in a separate terminal like:

CUDA_VISIBLE_DEVICES=0 optuna study optimize foo.py objective --study foo --storage sqlite:///example.db

I want to avoid that because the whole hyper parameter search continues in multiple rounds after that. I don't want to always manually start a process per GPU, check when all are finished, then start the next round.

I saw study.optimize has a n_jobs argument. At first glance this seems to be perfect. E.g. I could do this:

import optuna

def objective(trial):
    # the actual model would be trained here
    # the trainer here would need to know which GPU
    # it should be using
    best_val_loss = trainer(**trial.params)
    return best_val_loss

study = optuna.create_study()
study.optimize(objective, n_trials=100, n_jobs=8)

This starts multiple threads each starting a training. However, the trainer within objective somehow needs to know which GPU it should be using. Is there a trick to accomplish that?


Solution

  • After a few mental breakdowns I figured out that I can do what I want using a multiprocessing.Queue. To get it into the objective function I need to define it as a lambda function or as a class (I guess partial also works). E.g.

    from contextlib import contextmanager
    import multiprocessing
    N_GPUS = 2
    
    class GpuQueue:
    
        def __init__(self):
            self.queue = multiprocessing.Manager().Queue()
            all_idxs = list(range(N_GPUS)) if N_GPUS > 0 else [None]
            for idx in all_idxs:
                self.queue.put(idx)
    
        @contextmanager
        def one_gpu_per_process(self):
            current_idx = self.queue.get()
            yield current_idx
            self.queue.put(current_idx)
    
    
    class Objective:
    
        def __init__(self, gpu_queue: GpuQueue):
            self.gpu_queue = gpu_queue
    
        def __call__(self, trial: Trial):
            with self.gpu_queue.one_gpu_per_process() as gpu_i:
                best_val_loss = trainer(**trial.params, gpu=gpu_i)
                return best_val_loss
    
    if __name__ == '__main__':
        study = optuna.create_study()
        study.optimize(Objective(GpuQueue()), n_trials=100, n_jobs=8)