Search code examples
pythonamazon-sagemakeramazon-sagemaker-studio

TypeError: Object of type Properties is not JSON serializable (Sagemaker Pipeline)


I am trying to set up a Sagemaker pipeline that has 2 steps: preprocessing then training an RF model. The first step produces 3 outputs: a scaled_data.csv, train.csv, and test.csv. The second step should take train and test CSVs to train the RF model. An error arises when running step 2 stating "TypeError: Object of type Properties is not JSON serializable".

Here is my code for setting the pipeline steps:

# upload data from local path to default bucket with prefix raw_data
WORK_DIRECTORY = "data"

input_data = sagemaker_session.upload_data(
    path="{}/{}".format(WORK_DIRECTORY, "dataset.csv"),
    bucket=bucket,
    key_prefix="{}/{}".format(prefix, "input_data"),
)
  1. setting up the first step (scaling step)
scaling_processor = SKLearnProcessor(
    framework_version=FRAMEWORK_VERSION,
    instance_type="ml.m5.4xlarge",
    instance_count=processing_instance_count,
    base_job_name="data-process",
    role=role,
    sagemaker_session=pipeline_session,
)

scaling_processor_args = scaling_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="scaled_data", source="/opt/ml/processing/output/scaled_data/"),
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train/"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test/")    ],
    
    code="scripts/preprocess.py",
)

step_process = ProcessingStep(name="DataProcess", step_args=scaling_processor_args)
  1. setting up the 2nd step (RF Training-BYO mode), here is where the error arises:
estimator_cls = sagemaker.sklearn.SKLearn
FRAMEWORK_VERSION = "0.23-1"

rf_processor = FrameworkProcessor(
    estimator_cls,
    FRAMEWORK_VERSION,
    role = role,
    instance_count=1,
    instance_type='ml.m5.2xlarge',
    base_job_name='rf-modelling'
)

rf_processor_args = rf_processor.run(
    inputs=[
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                        destination="/opt/ml/processing/input"),
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                        destination="/opt/ml/processing/input"),    ],
    outputs=[
        ProcessingOutput(output_name="rf_model",source = "/opt/ml/processing/output/")
   ],
    
    code="scripts/train.py",
)

step_train = ProcessingStep(name="RFTrain", step_args=rf_processor_args)

An error arises when running step 2 stating "TypeError: Object of type Properties is not JSON serializable". The problem is with the line where I set processingInput for the 2nd step in rf_processor_args. Any ideas what might cause this error?


Solution

  • The run() you call seems to be incorrect choice, as run() is used to run the processing job directly, instead of defining the pipeline steps, which you apparently want. Use ProcessingStep directly and feed it with all the necessary arguments:

    # Setup the first step (scaling step)
    
    ...
    
    # --> Use ProcessingStep directly and provide all the args
    step_process = ProcessingStep(
        name="DataProcess",
        processor=scaling_processor,
        inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
        ],
        outputs=[
            ProcessingOutput(output_name="scaled_data", source="/opt/ml/processing/output/scaled_data/"),
            ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train/"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test/"),
        ],
        code="scripts/preprocess.py",
    )
    
    # Setup the 2nd step (RF Training-BYO mode)
    
    ...
    
    # --> Use ProcessingStep directly and provide all the args
    step_train = ProcessingStep(
        name="RFTrain",
        processor=rf_processor,
        inputs=[
            ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                            destination="/opt/ml/processing/input/train"),
            ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                            destination="/opt/ml/processing/input/test"),
        ],
        outputs=[
            ProcessingOutput(output_name="rf_model", source="/opt/ml/processing/output/"),
        ],
        code="scripts/train.py",
    )