Search code examples
postgresqlamazon-rdsamazon-sagemaker

Postgres Invoke Endpoint with a multidimension array of features?


I am working on postgres RDS integration with sagemaker invoke endpoint. I can't figure out how to format my query to support this use case. I have this example implementing the sagemaker endpoint directly in python

import boto3
import json

# Create a Boto3 client for SageMaker
client = boto3.client('sagemaker-runtime')

# Set the name of the SageMaker endpoint to invoke
endpoint_name = 'Custom-sklearn-model-2024-04-04-18-14-53'

# Set the content type of the input data
content_type = 'application/json'

# Hardcoded input data
input_data = [
    [1454.0, 1.0, 0.5, 1.0, 1.0, 0.0, 34.0, 0.7, 83.0, 4.0, 3.0, 250.0, 1033.0, 3419.0, 7.0, 5.0, 5.0, 1.0, 1.0, 0.0], 
    [1092.0, 1.0, 0.5, 1.0, 10.0, 0.0, 11.0, 0.5, 167.0, 3.0, 14.0, 468.0, 571.0, 737.0, 14.0, 4.0, 11.0, 0.0, 1.0, 0.0]
]

# Convert the input data to JSON format
payload = json.dumps(input_data)

# Invoke the SageMaker endpoint
response = client.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType=content_type,
    Body=payload
)

# Get the response from the SageMaker endpoint
result = response['Body'].read().decode('utf-8')

print(result)

with the output of

[3, 0]

This works great. I can either past 1 feature set or multiple in the 2D array and everything works great.

Now when I go to Postgres to try calling the invoke endpoint like this

SELECT aws_sagemaker.invoke_endpoint (
    'Custom-sklearn-model-2024-04-04-18-14-53',
    null,
    1454.0, 1.0, 0.5, 1.0, 1.0, 0.0, 34.0, 0.7, 83.0, 4.0, 3.0, 250.0, 1033.0, 3419.0, 7.0, 5.0, 5.0, 1.0, 1.0, 0.0
);

I assumed this should work but i get this in the cloudwatch

ValueError: Expected 2D array, got 1D array instead:

array=[1.454e+03 1.000e+00 5.000e-01 1.000e+00 1.000e+00 0.000e+00 3.400e+01
 7.000e-01 8.300e+01 4.000e+00 3.000e+00 2.500e+02 1.033e+03 3.419e+03
 7.000e+00 5.000e+00 5.000e+00 1.000e+00 1.000e+00 0.000e+00].

Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.

Here's what my entrypoint code looks like too.

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, precision_score, recall_score, f1_score, roc_curve, auc
import sklearn
import joblib
import boto3
import pathlib
from io import StringIO 
import argparse
import joblib
import os
import numpy as np
import pandas as pd
    
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf
    
if __name__ == "__main__":

    print("[INFO] Extracting arguments")
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument("--n_estimators", type=int, default=100)
    parser.add_argument("--random_state", type=int, default=0)

    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--train-file", type=str, default="train-V-1.csv")
    parser.add_argument("--test-file", type=str, default="test-V-1.csv")

    args, _ = parser.parse_known_args()
    
    print("SKLearn Version: ", sklearn.__version__)
    print("Joblib Version: ", joblib.__version__)

    print("[INFO] Reading data")
    print()
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))
    
    features = list(train_df.columns)
    label = features.pop(-1)
    
    print("Building training and testing datasets")
    print()
    X_train = train_df[features]
    X_test = test_df[features]
    y_train = train_df[label]
    y_test = test_df[label]

    print('Column order: ')
    print(features)
    print()
    
    print("Label column is: ",label)
    print()
    
    print("Data Shape: ")
    print()
    print("---- SHAPE OF TRAINING DATA (85%) ----")
    print(X_train.shape)
    print(y_train.shape)
    print()
    print("---- SHAPE OF TESTING DATA (15%) ----")
    print(X_test.shape)
    print(y_test.shape)
    print()
    
  
    print("Training RandomForest Model.....")
    print()
    model =  RandomForestClassifier(n_estimators=args.n_estimators, random_state=args.random_state, verbose = 3,n_jobs=-1)
    model.fit(X_train, y_train)
    print()
    

    model_path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model,model_path)
    print("Model persisted at " + model_path)
    print()

    
    y_pred_test = model.predict(X_test)
    test_acc = accuracy_score(y_test,y_pred_test)
    test_rep = classification_report(y_test,y_pred_test)

    print()
    print("---- METRICS RESULTS FOR TESTING DATA ----")
    print()
    print("Total Rows are: ", X_test.shape[0])
    print('[TESTING] Model Accuracy is: ', test_acc)
    print('[TESTING] Testing Report: ')
    print(test_rep)

Solution

  • Turns out the issue was in the inference code and I needed to better map the predict request from the request body.

    Here's an updated script.py

    
    
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, precision_score, recall_score, f1_score, roc_curve, auc
    import sklearn
    import joblib
    import boto3
    import pathlib
    from io import StringIO 
    import argparse
    import joblib
    import os
    import numpy as np
    import pandas as pd
    
    # inference functions ---------------
    
    def input_fn(request_body, request_content_type):
        print(request_body)
        print(request_content_type)
        if request_content_type == "text/csv":
            request_body = request_body.strip()
            try:
                df = pd.read_csv(StringIO(request_body), header=None)
                return df
            
            except Exception as e:
                print(e)
        else:
            return """Please use Content-Type = 'text/csv' and, send the request!!""" 
     
        
    def model_fn(model_dir):
        clf = joblib.load(os.path.join(model_dir, "model.joblib"))
        return clf
    
    def predict_fn(input_data, model):
        if type(input_data) != str:
            prediction = model.predict(input_data)
            print(prediction)
            return prediction
        else:
            return input_data
            
        
    if __name__ == "__main__":
    
        print("[INFO] Extracting arguments")
        parser = argparse.ArgumentParser()
    
        # hyperparameters sent by the client are passed as command-line arguments to the script.
        parser.add_argument("--n_estimators", type=int, default=100)
        parser.add_argument("--random_state", type=int, default=0)
    
        # Data, model, and output directories
        parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
        parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
        parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
        parser.add_argument("--train-file", type=str, default="train-V-1.csv")
        parser.add_argument("--test-file", type=str, default="test-V-1.csv")
    
        args, _ = parser.parse_known_args()
        
        print("SKLearn Version: ", sklearn.__version__)
        print("Joblib Version: ", joblib.__version__)
    
        print("[INFO] Reading data")
        print()
        train_df = pd.read_csv(os.path.join(args.train, args.train_file))
        test_df = pd.read_csv(os.path.join(args.test, args.test_file))
        
        features = list(train_df.columns)
        label = features.pop(-1)
        
        print("Building training and testing datasets")
        print()
        X_train = train_df[features]
        X_test = test_df[features]
        y_train = train_df[label]
        y_test = test_df[label]
    
        print('Column order: ')
        print(features)
        print()
        
        print("Label column is: ",label)
        print()
        
        print("Data Shape: ")
        print()
        print("---- SHAPE OF TRAINING DATA (85%) ----")
        print(X_train.shape)
        print(y_train.shape)
        print()
        print("---- SHAPE OF TESTING DATA (15%) ----")
        print(X_test.shape)
        print(y_test.shape)
        print()
        
      
        print("Training RandomForest Model.....")
        print()
        model =  RandomForestClassifier(n_estimators=args.n_estimators, random_state=args.random_state, verbose = 3,n_jobs=-1)
        model.fit(X_train, y_train)
        print()
        
    
        model_path = os.path.join(args.model_dir, "model.joblib")
        joblib.dump(model,model_path)
        print("Model persisted at " + model_path)
        print()
    
        
        y_pred_test = model.predict(X_test)
        test_acc = accuracy_score(y_test,y_pred_test)
        test_rep = classification_report(y_test,y_pred_test)
    
        print()
        print("---- METRICS RESULTS FOR TESTING DATA ----")
        print()
        print("Total Rows are: ", X_test.shape[0])
        print('[TESTING] Model Accuracy is: ', test_acc)
        print('[TESTING] Testing Report: ')
        print(test_rep)