Search code examples
azureazure-data-factoryazure-machine-learning-service

Unable to parametrize ML pipeline endpoint name - Azure Data Factory


Sorry for long post, I need to explain it properly for people to undertsand.

I have a pipeline in datafctory that triggers a published AML endpoint: enter image description here

I am trying to parametrize this ADF pipeline so that I can deploy to test and prod, but on test and prod the aml endpoints are different.

Therefore, I have tried to edit the parameter configuration in ADF as shows here: enter image description here

Here in the section Microsoft.DataFactory/factories/pipelines I add "*":"=" so that all the pipeline parameters are parametrized:

 "Microsoft.DataFactory/factories/pipelines": {
        "*": "="
    }

After this I export the template to see which parameters are there in json, there are lot of them but I do not see any paramter that has aml endpoint name as value, but I see the endpint ID is parametrized.

enter image description here

My question is: Is it possible to parametrize the AML endpoint by name? So that, when deploying ADF to test I can just provide the AML endpoint name and it can pick the id automatically:

enter image description here


Solution

  • I finally fixed this.

    The trick is to not chose Pipeline Endpoint ID but to choose Pipeline ID.

    Pipeline ID can be parametrized and I have set up this to come from a global parameter. Therefore I do not need to find the right level of identation everytime

    enter image description here

    Then:

    enter image description here

    Later you add the global parameters to your ARM template:

    enter image description here

    And in the parameter template you add:

    "Microsoft.DataFactory/factories": {
            "properties": {
                "globalParameters": {
                    "*": {
                        "value": "="
                    }
                },
                "globalConfigurations": {
                    "*": "="
                },
                "encryption": {
                    "*": "=",
                    "identity": {
                        "*": "="
                    }
                }
            }
    "Microsoft.DataFactory/factories/globalparameters": {
        "properties": {
            "*": {
                "value": "="
            }
        }
    }
    

    Finally I wrote a python CLI tool to get the latest pipeline ID for a given published pipeline id:

    import argparse
    from azureml.pipeline.core import PipelineEndpoint, PublishedPipeline, Pipeline
    from azureml.core import Workspace
    from env_variables import Env
    from manage_workspace import get_workspace
    
    
    def get_latest_published_endpoint(ws : Workspace, pipeline_name : str) -> str:
        """
        Get the latest published endpoint given a machine learning pipeline name.
        The function is used to update the pipeline id in ADF deploy pipeline
    
        Parameters
        ------
        ws : azureml.core.Workspace
            A workspace object to use to search for the models
        pipeline_name : str
            A string containing the pipeline name to retrieve the latest version
    
        Returns
        -------
        pipeline_name : azureml.pipeline.core.PipelineEndpoint
            The pipeline name to retrieve the last version
        """
        pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name=pipeline_name)
        endpoint_id = pipeline_endpoint.get_pipeline().id # this gives back the pipeline id
        # pipeline_endpoint.id gives back the pipeline endpoint id which can not be set
        # as dynamic parameter in ADF in an easy way
    
        return endpoint_id
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
        parser.add_argument("--monitoring_pipeline_name", type=str,
                            help="Pipeline Name to get endpoint id",
                            default='yourmonitoringpipeline')
        parser.add_argument("--training_pipeline_name", type=str,
                            help="Pipeline Name to get endpoint id",
                            default='yourtrainingpipeline')
        parser.add_argument("--scoring_pipeline_name", type=str,
                            help="Pipeline Name to get endpoint id",
                            default='yourscoringpipeline')
        args, _ = parser.parse_known_args()
        e = Env()
    
        ws = get_workspace(e.workspace_name, e.subscription_id, e.resource_group)  # type: ignore
        latest_monitoring_endpoint = get_latest_published_endpoint(ws, pipeline_name=args.monitoring_pipeline_name)  # type: ignore
        latest_training_endpoint = get_latest_published_endpoint(ws, pipeline_name=args.training_pipeline_name) # type: ignore
        latest_scoring_endpoint = get_latest_published_endpoint(ws, pipeline_name=args.scoring_pipeline_name) # type: ignore
        print('##vso[task.setvariable variable=MONITORING_PIPELINE_ID;]%s' % (latest_monitoring_endpoint))
        print('##vso[task.setvariable variable=TRAINING_PIPELINE_ID;]%s' % (latest_training_endpoint))
        print('##vso[task.setvariable variable=SCORING_PIPELINE_ID;]%s' % (latest_scoring_endpoint))
    

    By printing the variables in these way they are added to environment variables that later I can pick in the ARM deploy step:

    enter image description here

    And then we have our desired setup:

    enter image description here

    Different pipeline IDs for different environments.

    Maybe material for a blog post as it works like charm.