Search code examples
azureazure-machine-learning-service

How do we do Batch Inferencing on Azure ML Service with Parameterized Dataset/DataPath input?


The ParallelRunStep Documentation suggests the following:

A named input Dataset (DatasetConsumptionConfig class)

path_on_datastore = iris_data.path('iris/')
input_iris_ds = Dataset.Tabular.from_delimited_files(path=path_on_datastore, validate=False)
named_iris_ds = input_iris_ds.as_named_input(iris_ds_name)

Which is just passed as an Input:

distributed_csv_iris_step = ParallelRunStep(
    name='example-iris',
    inputs=[named_iris_ds],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'iris-prs'],
    allow_reuse=False
)

The Documentation to submit Dataset Inputs as Parameters suggests the following: The Input is a DatasetConsumptionConfig class element

tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)

Which is passed in arguments as well in inputs

train_step = PythonScriptStep(
    name="train_step",
    script_name="train_with_dataset.py",
    arguments=["--param2", tabular_ds_consumption],
    inputs=[tabular_ds_consumption],
    compute_target=compute_target,
    source_directory=source_directory)

While submitting with new parameter we create a new Dataset class:

iris_tabular_ds = Dataset.Tabular.from_delimited_files('some_link')

And submit it like this:

pipeline_run_with_params = experiment.submit(pipeline, pipeline_parameters={'tabular_ds_param': iris_tabular_ds})

However, how do we combine this: How do we pass a Dataset Input as a Parameter to the ParallelRunStep?

If we create a DatasetConsumptionConfig class element like so:

tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)

And pass it as an argument in the ParallelRunStep, it will throw an error.

References:

  1. Notebook with Dataset Input Parameter
  2. ParallelRunStep Notebook

Solution

  • For the inputs we create Dataset class instances:

    tabular_ds1 = Dataset.Tabular.from_delimited_files('some_link')
    tabular_ds2 = Dataset.Tabular.from_delimited_files('some_link')
    

    ParallelRunStep produces an output file, we use the PipelineData class to create a folder which will store this output:

    from azureml.pipeline.core import Pipeline, PipelineData
    
    output_dir = PipelineData(name="inferences", datastore=def_data_store)
    

    The ParallelRunStep depends on ParallelRunConfig Class to include details about the environment, entry script, output file name and other necessary definitions:

    from azureml.pipeline.core import PipelineParameter
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    

    The input to ParallelRunStep is created using the following code

    tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_ds1)
    tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
    

    The PipelineParameter helps us run the pipeline for different datasets. ParallelRunStep consumes this as an input:

    parallelrun_step = ParallelRunStep(
        name="some-name",
        parallel_run_config=parallel_run_config,
        inputs=[ tabular_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    

    To consume with another dataset:

    pipeline_run_2 = experiment.submit(pipeline, 
                                       pipeline_parameters={"tabular_ds_param": tabular_ds2}
    )
    

    There is an error currently: DatasetConsumptionConfig and PipelineParameter cannot be reused