Search code examples
pythonazure-machine-learning-servicehyperparametersmlflow

How to Output and Deploy Multiple Artifacts from Best Child in Sweep Job Component?


I am building a pipeline composed of 6 components using Azure Machine Learning SDK2 Python, and I am currently working on the 4 component.

In my script, I created a Sweep Job to hyper-tune my model, which is an autoencoder. The script performs the following tasks:

  • Builds and trains the autoencoder
  • Create the Autoencoder (Trained)
  • Get the Encoder Layer (For analyses)
  • Fit standard scaler (object)
  • Log Metrics using mlflow

I am saving the following:

  • The autoencoder model
  • The encoder layers
  • The StandardScaler object (For analyses)
  • The ScalerAutoencoderWrapper (StandardScaler fit + keras.model.predict) mlflow.sklearn_log() raised a warning that .predict method is missing (To me, this function was built for End-to-end Pipeline + Algo from Sklearn) therefore I use a custom pyfunc from mlflow

After several job experiments, I realized that the SweepJob component automatically outputs the best child. I cannot use mlflow_model as an output because I have four different outputs, not just one (I need all four).

I thought of using uri_folder, but I'm unsure how iterate thorugh the uri_folder to get my ScalerAutoencoderWrapper or use mlflow to deploy the model in the next step

My question is:

  1. How can I output all these folders, especially the ScalerAutoencoderWrapper, and pass it as uri_folder? Additionally
  2. how can I use MLflow to deploy the ScalerAutoencoderWrapper in the next component while retaining the other files?

Any feedback on how the files are saved is also welcome. Thanks a lot!

I have attached my code.


.... PARTIAL CODE ...

# Log Model
    with mlflow.start_run() as run:

# Build Model
        autoencoder, encoder = build_model(
            input_dim=input_dim,
            hidden_layers=hidden_layers,
            encoded_dim=encoded_dim,
            l1_regularizer=l1_regularizer,
            learning_rate=learning_rate,
            return_encoder=return_encoder,
        )

        # Define Strategy
        early_stopping = EarlyStopping(
            monitor=MONITOR,
            patience=patience,
            restore_best_weights=True
        )

        # Fit & Keep History
        autoencoder.fit(
            X_scaled,
            X_scaled,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_validate_scaled, X_validate_scaled),
            callbacks=[early_stopping, MLflowCallback()],  # Log the final validation loss
        )

        # Save Model artifacts
        input_raw_example = X_train.iloc[:5]
        input_transformed = scalerObj.fit_transform(input_raw_example)

        # Artifact Names
        scaler_pkl = 'scaler.pkl'
        encoder_folder = 'encoder'
        autoencoder_folder= 'autoencoder'
        autoencoder_wrapper_folder = 'scaler_autoencoder_wrapper'

        # Save StandardScaler Object
        print("--------------> Save Object Scaler")
        with open('scaler.pkl', "wb") as f:
            pickle.dump(scalerObj, f)

        mlflow.log_artifact('scaler.pkl', run_id=run_id)

        # Save encoder layers
        print("--------------> Save Encoder")
        mlflow.keras.log_model(encoder, 'encoder', input_example=input_transformed)

        # Save Autoencoder model Only
        print("--------------> Save AutoEncoder")
        mlflow.keras.log_model(autoencoder, 'autoencoder', input_example=input_transformed)

        # Save StandardScaler + Autoencoder
        print("--------------> Save ScalerAutoencoderWrapper")
        scaler_autoencoder_wrapper = ScalerAutoencoderWrapper(
            scaler=scalerObj,
            autoencoder=autoencoder
        )

        mlflow.pyfunc.log_model(
            artifact_path='scaler_autoencoder_wrapper',
            python_model=scaler_autoencoder_wrapper,
            input_example=input_transformed,
            signature=infer_signature(
                model_input=input_transformed,
                model_output=scaler_autoencoder_wrapper.predict(
                    context=None,
                    model_input=input_raw_example
                )
            ),
        )

        print(f"Training Completed, Model and Scaler saved with id : {run_id}")

My Python code

# Create Nodes for Pipelines
@pipeline(default_compute = 'XXXX', 
          display_name="ABCDE",
          experiment_name = "EFGH",
          tags={'objective':'DONTKNOW'})
def pipeline_autoencoder(input_file):

    # Step 1: Local Feature Selection
    feature_extraction_step = feature_extraction(
        input_file = input_file,
    )

    # Step 2: Local Split Selection 
    data_split_step = data_split (
        input_file = feature_extraction_step.outputs.output_file,
    )

    # Step 3: Hyperparameter tuning (Sweep Job)
    train_model_step = train_tune_model(
        x_train=data_split_step.outputs.x_train_path,
        y_train=data_split_step.outputs.y_train_path,
        x_validate=data_split_step.outputs.x_validate_path,
        y_validate=data_split_step.outputs.y_validate_path,
        hidden_layers = Choice([str, str]),
        encoded_dim=Choice([int]),
        l1_regularizer=Choice([float, float), 
        learning_rate=Choice([float, float]),
        batch_size=Choice([int, int]),
        epochs=Choice([int, int]),
        patience=Choice([int, int]),
     
    )

    # OverWrite
    sweep_step = train_model_step.sweep(
        compute='XXXX',
        primary_metric = "METRIC",
        goal = "MINIMIZE",
        sampling_algorithm="RANDOM",
    )

    sweep_step.early_termination = BanditPolicy(
               evaluation_interval=INT,
               slack_factor=FLOAT,
                delay_evaluation=INT)

    sweep_step.set_limits(max_total_trials=INT, max_concurrent_trials=INT, timeout=INT)

    # Step 4 
    deploybestchild (NOT DONE ALREADY)

    )
    return {
        'model_output': sweep_step.outputs.model_output,
        "x_test": data_split_step.outputs.x_test_path,
        "y_test": data_split_step.outputs.y_test_path,

        
    }
  1. I have tried to output the file from sweep job as uri_folder and got only a blob storage file encoded
  2. I tried to Iterate using os.walk(path) but I got nothing is saying empty
  3. I tried to follow this example Azure Hyper Parameters exmple pehrps you can tell me why they do this
# train model
    model = train_model(params, X_train, X_test, y_train, y_test)
    # Output the model and test data
    # write to local folder first, then copy to output folder

    mlflow.sklearn.save_model(model, "model")

    from distutils.dir_util import copy_tree

    # copy subdirectory example
    from_directory = "model"
    to_directory = args.model_output

    copy_tree(from_directory, to_directory)

UPDATE

update ont @JayashankarGS answer

I truly appreciate your support, but I'm currently facing an error that says either the blob storage does not exist or I get the following: yaml.representer.RepresenterError: ('cannot represent an object').

I've noticed that you have a strong reputation for resolving these types of issues. Could you assist me with the following questions?

  1. Is the Sweep Job actually outputting the best model? I see a model with 2 Trials output, but there’s no documentation to confirm my assumption.
  2. I'm using pyfunc.log_model. Should I switch from an MLflow model to a custom model since I’m using pyfunc? (specifically regarding YAML input/output)
outputs:
  model_ouput:
    type: **mlflow_model** or **custom_model**
    description: ....
  artifacts_output:
    type: uri_folder
    description:.....

  1. I tried setting (as you suggested) autoencoder_wrapper_folder = args.artifacts+'scaler_autoencoder_wrapper'`, but I encountered `yaml.representer.RepresenterError: ('cannot represent an object', PosixPath('/mnt/azureml/cr/j/ABCDE/cap/data-capability/wd/model_output/model')).

  2. Following the Azure example:

# copy subdirectory example
from_directory = "model"
to_directory = args.model_output

copy_tree(from_directory, to_directory)

I got the error 'model is not a directory.' I then tried using os.getcwd() because i assumed my model is saved in the current dir, but I received 'No such file or directory: '/mnt/azureml/cr/j/ABCDS/exe/wd/model'.' It seems the model isn't being saved there, and I’m unsure where Azure Machine Learning is saving it.

  1. My pipeline currently consists of Step 1, Step 2, Sweep Job -> Evaluation. For now, I’m passing the model’s .h5 file and the StandardScaler object to another component for evaluation and registration. This isn't ideal, but it's the only workaround I’ve found. I suspect pyfunc might need a different approach for saving, but I’m unsure as the documentation only provides simple examples using sweep job and Sklearn.log_model or mlflow but only with one model but not sweep + mlflow save models."

Solution

  • You need to log the files in the output location you configured for training component.

    Below is the sample training component yaml file.

    $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
    type: command
    
    name: train_model
    display_name: train_model
    version: 1
    
    inputs: 
      data:
        type: uri_folder
      ....Your extra inputs
      max_iter:
        type: integer
        default: -1
      decision_function_shape:
        type: string
        default: ovr
      break_ties:
        type: boolean
        default: false
      random_state:
        type: integer
        default: 42
    
    outputs:
      model_output:
        type: mlflow_model
      artifacts:
        type: uri_folder
      
    code: ./train-src
    
    environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest
    
    command: >-
      python train.py 
      --data ${{inputs.data}}
      ...You extra arguments
      --model_output ${{outputs.model_output}}
      --artifacts ${{outputs.artifacts}}
    

    Here, if see the outputs having the model_output and artifacts which is used to log model and save your files.

    Pass these values in training component arguments like below and it will be saved.

    train_model_step = train_tune_model(
            x_train=data_split_step.outputs.x_train_path,
            y_train=data_split_step.outputs.y_train_path,
            x_validate=data_split_step.outputs.x_validate_path,
            y_validate=data_split_step.outputs.y_validate_path,
            hidden_layers = Choice([str, str]),
            encoded_dim=Choice([int]),
            l1_regularizer=Choice([float, float), 
            learning_rate=Choice([float, float]),
            batch_size=Choice([int, int]),
            epochs=Choice([int, int]),
            patience=Choice([int, int]),
            model_output="Path to save model",
            artifacts="Path to save artifacts."
        )
    

    Below the code you need to save.

            # Artifact Names
            scaler_pkl = args.artifacts+'scaler.pkl'
            encoder_folder = args.artifacts+'encoder'
            autoencoder_folder= args.artifacts+'autoencoder'
            autoencoder_wrapper_folder = args.artifacts+'scaler_autoencoder_wrapper'
    
            # Save StandardScaler Object
            print("--------------> Save Object Scaler")
            with open(scaler_pkl, "wb") as f:
                pickle.dump(scalerObj, f)
    
            # Save encoder layers
            print("--------------> Save Encoder")
            mlflow.keras.log_model(encoder, encoder_folder, input_example=input_transformed)
    
            # Save Autoencoder model Only
            print("--------------> Save AutoEncoder")
            mlflow.keras.log_model(autoencoder, autoencoder_folder, input_example=input_transformed)
    
            # Save StandardScaler + Autoencoder
            print("--------------> Save ScalerAutoencoderWrapper")
            scaler_autoencoder_wrapper = ScalerAutoencoderWrapper(
                scaler=scalerObj,
                autoencoder=autoencoder
            )
    
            mlflow.pyfunc.log_model(
                artifact_path=autoencoder_wrapper_folder,
                python_model=scaler_autoencoder_wrapper,
                input_example=input_transformed,
                signature=infer_signature(
                    model_input=input_transformed,
                    model_output=scaler_autoencoder_wrapper.predict(
                        context=None,
                        model_input=input_raw_example
                    )
                ),
            )
    

    and save the mlflow model in args.model_output.

    Next, you access model in sweep_step.outputs.model_output and artifacts in sweep_step.outputs.artifacts.

    The same thing is done in azure sample code which you have given.

        mlflow.sklearn.save_model(model, "model")
    
        from distutils.dir_util import copy_tree
    
        # copy subdirectory example
        from_directory = "model"
        to_directory = args.model_output
    
        copy_tree(from_directory, to_directory)
    

    After saving the mlflow model in the folder model, all the files inside is copied to the output folder given args.model_output.

    But i have given directly the path to save instead of copy.