Search code examples
azureazure-machine-learning-serviceazureml-python-sdkazuremlsdkazure-ml-pipelines

How to trigger an event-based based pipeline in AzureML?


I have a published pipeline in AzureML that preprocess the data and train a new model. I am trying an event-based schedule so that whenever a new dataset is registered in the workspace, it triggers the whole training pipeline. I am using the python AzureML SDK-1.

Using the information from the docs, I tried setting up the schedule as follows:

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.", pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, polling_interval=2)

When I check the status of the schedule, it says its active, however, when I register a new dataset in the blob storage associated with the workspace, nothing happens even if I wait for more than 5 mins.

Can someone help me understand how does this work in terms of triggering the pipeline when a new dataset is registered?


Solution

  • AzureML reacts to data changes in the datastore, not to dataset registrations. If you register a new version of a dataset using the same data path, and the data itself hasn't changed, the pipeline may not be triggered, When creating a reactive schedule, you can specify a path_on_datastore parameter to define which folder or file to monitor.

    If you don't specify this parameter, it will default to the root of the datastore. Ensure the data you're changing/adding is in the correct location.

    As a simple test, try manually adding a file to the monitored path in your workspaceblobstore through the Azure portal (or another method) and see if that triggers the pipeline. This can help differentiate between issues with dataset registration and issues with the datastore monitoring.

    Below is an example for a change based schedule. In this I am triggerring the pipeline when a new mp3 file is added in a specific blob container.

    from azureml.data.datapath import DataPath
    
    datastore = Datastore.get(ws, datastore_name='<your-datastore>')
    
    reactive_schedule = Schedule.create(ws, 
                                        name="R-Schedule", 
                                        description="Based on input file change.",
                                        pipeline_id=published_pipeline.id, 
                                        experiment_name=experiment_name, 
                                        datastore=datastore,
                                        polling_interval=1,
                                        data_path_parameter_name="input_mp3_data",
                                        path_on_datastore='r-pipeline-data/mp3/' 
                                       )