The ParallelRunStep Documentation suggests the following:
A named input Dataset (DatasetConsumptionConfig
class)
path_on_datastore = iris_data.path('iris/')
input_iris_ds = Dataset.Tabular.from_delimited_files(path=path_on_datastore, validate=False)
named_iris_ds = input_iris_ds.as_named_input(iris_ds_name)
Which is just passed as an Input:
distributed_csv_iris_step = ParallelRunStep(
name='example-iris',
inputs=[named_iris_ds],
output=output_folder,
parallel_run_config=parallel_run_config,
arguments=['--model_name', 'iris-prs'],
allow_reuse=False
)
The Documentation to submit Dataset Inputs as Parameters suggests the following:
The Input is a DatasetConsumptionConfig
class element
tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
Which is passed in arguments
as well in inputs
train_step = PythonScriptStep(
name="train_step",
script_name="train_with_dataset.py",
arguments=["--param2", tabular_ds_consumption],
inputs=[tabular_ds_consumption],
compute_target=compute_target,
source_directory=source_directory)
While submitting with new parameter we create a new Dataset
class:
iris_tabular_ds = Dataset.Tabular.from_delimited_files('some_link')
And submit it like this:
pipeline_run_with_params = experiment.submit(pipeline, pipeline_parameters={'tabular_ds_param': iris_tabular_ds})
However, how do we combine this: How do we pass a Dataset Input as a Parameter to the ParallelRunStep?
If we create a DatasetConsumptionConfig
class element like so:
tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
And pass it as an argument in the ParallelRunStep, it will throw an error.
References:
For the inputs we create Dataset class instances:
tabular_ds1 = Dataset.Tabular.from_delimited_files('some_link')
tabular_ds2 = Dataset.Tabular.from_delimited_files('some_link')
ParallelRunStep produces an output file, we use the PipelineData class to create a folder which will store this output:
from azureml.pipeline.core import Pipeline, PipelineData
output_dir = PipelineData(name="inferences", datastore=def_data_store)
The ParallelRunStep depends on ParallelRunConfig Class to include details about the environment, entry script, output file name and other necessary definitions:
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
parallel_run_config = ParallelRunConfig(
source_directory=scripts_folder,
entry_script=script_file,
mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
error_threshold=10,
output_action="append_row",
append_row_file_name="mnist_outputs.txt",
environment=batch_env,
compute_target=compute_target,
process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
node_count=2
)
The input to ParallelRunStep is created using the following code
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_ds1)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
The PipelineParameter helps us run the pipeline for different datasets. ParallelRunStep consumes this as an input:
parallelrun_step = ParallelRunStep(
name="some-name",
parallel_run_config=parallel_run_config,
inputs=[ tabular_ds_consumption ],
output=output_dir,
allow_reuse=False
)
To consume with another dataset:
pipeline_run_2 = experiment.submit(pipeline,
pipeline_parameters={"tabular_ds_param": tabular_ds2}
)
There is an error currently: DatasetConsumptionConfig and PipelineParameter cannot be reused