I am trying to set up a Sagemaker pipeline that has 2 steps: preprocessing then training an RF model. The first step produces 3 outputs: a scaled_data.csv, train.csv, and test.csv. The second step should take train and test CSVs to train the RF model. An error arises when running step 2 stating "TypeError: Object of type Properties is not JSON serializable".
Here is my code for setting the pipeline steps:
# upload data from local path to default bucket with prefix raw_data
WORK_DIRECTORY = "data"
input_data = sagemaker_session.upload_data(
path="{}/{}".format(WORK_DIRECTORY, "dataset.csv"),
bucket=bucket,
key_prefix="{}/{}".format(prefix, "input_data"),
)
scaling_processor = SKLearnProcessor(
framework_version=FRAMEWORK_VERSION,
instance_type="ml.m5.4xlarge",
instance_count=processing_instance_count,
base_job_name="data-process",
role=role,
sagemaker_session=pipeline_session,
)
scaling_processor_args = scaling_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="scaled_data", source="/opt/ml/processing/output/scaled_data/"),
ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train/"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test/") ],
code="scripts/preprocess.py",
)
step_process = ProcessingStep(name="DataProcess", step_args=scaling_processor_args)
estimator_cls = sagemaker.sklearn.SKLearn
FRAMEWORK_VERSION = "0.23-1"
rf_processor = FrameworkProcessor(
estimator_cls,
FRAMEWORK_VERSION,
role = role,
instance_count=1,
instance_type='ml.m5.2xlarge',
base_job_name='rf-modelling'
)
rf_processor_args = rf_processor.run(
inputs=[
ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
destination="/opt/ml/processing/input"),
ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/input"), ],
outputs=[
ProcessingOutput(output_name="rf_model",source = "/opt/ml/processing/output/")
],
code="scripts/train.py",
)
step_train = ProcessingStep(name="RFTrain", step_args=rf_processor_args)
An error arises when running step 2 stating "TypeError: Object of type Properties is not JSON serializable". The problem is with the line where I set processingInput for the 2nd step in rf_processor_args. Any ideas what might cause this error?
The run()
you call seems to be incorrect choice, as run()
is used to run the processing job directly, instead of defining the pipeline steps, which you apparently want. Use ProcessingStep
directly and feed it with all the necessary arguments:
# Setup the first step (scaling step)
...
# --> Use ProcessingStep directly and provide all the args
step_process = ProcessingStep(
name="DataProcess",
processor=scaling_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="scaled_data", source="/opt/ml/processing/output/scaled_data/"),
ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train/"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test/"),
],
code="scripts/preprocess.py",
)
# Setup the 2nd step (RF Training-BYO mode)
...
# --> Use ProcessingStep directly and provide all the args
step_train = ProcessingStep(
name="RFTrain",
processor=rf_processor,
inputs=[
ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
destination="/opt/ml/processing/input/train"),
ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/input/test"),
],
outputs=[
ProcessingOutput(output_name="rf_model", source="/opt/ml/processing/output/"),
],
code="scripts/train.py",
)