Search code examples
jsonpython-3.xmachine-learningazure-machine-learning-service

Making predictions with Azure Machine learning with new data that contains headers (like pd.Dataframe)


My question is somehow related to https://learn.microsoft.com/en-us/answers/questions/217305/data-input-format-call-the-service-for-azure-ml-ti.html - however, the provided solution does not seem to work.

I am constructing a simple model with heart-disease dataset but I wrap it into Pipeline as I use some featurization steps (scaling, encoding etc.) The full script below:

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import pickle

# data input
df = pd.read_csv('heart.csv')

# numerical variables
num_cols = ['age',
            'trestbps',
            'chol',
            'thalach',
            'oldpeak'
]

# categorical variables
cat_cols = ['sex',
            'cp',
            'fbs',
            'restecg',
            'exang',
            'slope',
            'ca',
            'thal']

# changing format of the categorical variables
df[cat_cols] = df[cat_cols].apply(lambda x: x.astype('object'))

# target variable
y = df['target']

# features
X = df.drop(['target'], axis=1)

# data split:

# random seed
np.random.seed(42)

# splitting the data
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=0.2,
                                                    stratify=y)

# double check
X_train.shape, X_test.shape, y_train.shape, y_test.shape

# pipeline for numerical data
num_preprocessing = Pipeline([('num_imputer', SimpleImputer(strategy='mean')), # imputing with mean
                                                   ('minmaxscaler', MinMaxScaler())]) # scaling

# pipeline for categorical data
cat_preprocessing = Pipeline([('cat_imputer', SimpleImputer(strategy='constant', fill_value='missing')), # filling missing values
                                                ('onehot', OneHotEncoder(drop='first', handle_unknown='error'))]) # One Hot Encoding

# preprocessor - combining pipelines
preprocessor = ColumnTransformer([
                                  ('categorical', cat_preprocessing, cat_cols),
                                  ('numerical', num_preprocessing, num_cols)
                                                           ])

# initial model parameters
log_ini_params = {'penalty': 'l2', 
                  'tol': 0.0073559740277086005, 
                  'C': 1.1592424247511928, 
                  'fit_intercept': True, 
                  'solver': 'liblinear'}

# model - Pipeline
log_clf = Pipeline([('preprocessor', preprocessor),
                  ('clf', LogisticRegression(**log_ini_params))])

log_clf.fit(X_train, y_train)

# dumping the model
f = 'model/log.pkl'
with open(f, 'wb') as file:
    pickle.dump(log_clf, file)

# loading it
loaded_model = joblib.load(f)

# double check on a single datapoint
new_data = pd.DataFrame({'age': 71,
                         'sex': 0,
                         'cp': 0,
                         'trestbps': 112,
                         'chol': 203,
                         'fbs': 0,
                         'restecg': 1,
                         'thalach': 185,
                         'exang': 0,
                         'oldpeak': 0.1,
                         'slope': 2,
                         'ca': 0,
                          'thal': 2}, index=[0])

loaded_model.predict(new_data)

...and it works just fine. Then I deploy the model to the Azure Web Service using these steps:

  1. I create the score.py file
import joblib
from azureml.core.model import Model
import json

def init():
    global model
    model_path = Model.get_model_path('log') # logistic
    print('Model Path is  ', model_path)
    model = joblib.load(model_path)


def run(data):
    try:
        data = json.loads(data)
        result = model.predict(data['data'])
        # any data type, as long as it is JSON serializable.
        return {'data' : result.tolist() , 'message' : 'Successfully classified heart diseases'}
    except Exception as e:
        error = str(e)
        return {'data' : error , 'message' : 'Failed to classify heart diseases'}
  1. I deploy the model:
from azureml.core import Workspace
from azureml.core.webservice import AciWebservice
from azureml.core.webservice import Webservice
from azureml.core.model import InferenceConfig
from azureml.core.environment import Environment
from azureml.core import Workspace
from azureml.core.model import Model
from azureml.core.conda_dependencies import CondaDependencies

ws = Workspace.from_config()

model = Model.register(workspace = ws,
              model_path ='model/log.pkl',
              model_name = 'log',
              tags = {'version': '1'},
              description = 'Heart disease classification',
              )

# to install required packages
env = Environment('env')
cd = CondaDependencies.create(pip_packages=['pandas==1.1.5', 'azureml-defaults','joblib==0.17.0'], conda_packages = ['scikit-learn==0.23.2'])
env.python.conda_dependencies = cd

# Register environment to re-use later
env.register(workspace = ws)
print('Registered Environment')

myenv = Environment.get(workspace=ws, name='env')

myenv.save_to_directory('./environ', overwrite=True)

aciconfig = AciWebservice.deploy_configuration(
            cpu_cores=1,
            memory_gb=1,
            tags={'data':'heart disease classifier'},
            description='Classification of heart diseases',
            )

inference_config = InferenceConfig(entry_script='score.py', environment=myenv)

service = Model.deploy(workspace=ws,
                name='hd-model-log',
                models=[model],
                inference_config=inference_config,
                deployment_config=aciconfig, 
                overwrite = True)

service.wait_for_deployment(show_output=True)
url = service.scoring_uri
print(url)

The deployment is fine:

Succeeded ACI service creation operation finished, operation "Succeeded"

But I can not make any predictions with the new data. I try to use:

import pandas as pd

new_data = pd.DataFrame([[71, 0, 0, 112, 203, 0, 1, 185, 0, 0.1, 2, 0, 2],
                         [80, 0, 0, 115, 203, 0, 1, 185, 0, 0.1, 2, 0, 0]],
                         columns=['age', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'oldpeak', 'slope', 'ca', 'thal'])

Following the answer from this topic (https://learn.microsoft.com/en-us/answers/questions/217305/data-input-format-call-the-service-for-azure-ml-ti.html) I transform the data:

test_sample = json.dumps({'data': new_data.to_dict(orient='records')})

And try to make some predictions:

import json
import requests
data = test_sample
headers = {'Content-Type':'application/json'}
r = requests.post(url, data=data, headers = headers)
print(r.status_code)
print(r.json())

However, I encounter an error:

200 {'data': "Expected 2D array, got 1D array instead:\narray=[{'age': 71, 'sex': 0, 'cp': 0, 'trestbps': 112, 'chol': 203, 'fbs': 0, 'restecg': 1, 'thalach': 185, 'exang': 0, 'oldpeak': 0.1, 'slope': 2, 'ca': 0, 'thal': > 2}\n {'age': 80, 'sex': 0, 'cp': 0, 'trestbps': 115, 'chol': 203, 'fbs': 0, 'restecg': 1, 'thalach': 185, 'exang': 0, 'oldpeak': 0.1, 'slope': 2, 'ca': 0, 'thal': 0}].\nReshape your data either using array.reshape(-1, > 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.", 'message': 'Failed to classify heart diseases'}

How is it possible to adjust the input data to this form of predictions and add other output like predict_proba so I could store them in a separate output dataset?

I know this error is somehow related either with the "run" part of the score.py file or the last code cell that calls the webservice, but I'm unable to find it.

Would really appreciate some help.


Solution

  • I believe I managed to solve the problem - even though I encountered some serious issues. :)

    1. As described here here - I edited the score.py script:
    import joblib
    from azureml.core.model import Model
    import numpy as np
    import json
    import pandas as pd
    import numpy as np
    
    from inference_schema.schema_decorators import input_schema, output_schema
    from inference_schema.parameter_types.numpy_parameter_type import NumpyParameterType
    from inference_schema.parameter_types.pandas_parameter_type import PandasParameterType
    from inference_schema.parameter_types.standard_py_parameter_type import StandardPythonParameterType
        
    data_sample = PandasParameterType(pd.DataFrame({'age': pd.Series([0], dtype='int64'),
                                                    'sex': pd.Series(['example_value'], dtype='object'),
                                                    'cp': pd.Series(['example_value'], dtype='object'),
                                                    'trestbps': pd.Series([0], dtype='int64'),
                                                    'chol': pd.Series([0], dtype='int64'),
                                                    'fbs': pd.Series(['example_value'], dtype='object'),
                                                    'restecg': pd.Series(['example_value'], dtype='object'),
                                                    'thalach': pd.Series([0], dtype='int64'),
                                                    'exang': pd.Series(['example_value'], dtype='object'),
                                                    'oldpeak': pd.Series([0.0], dtype='float64'),
                                                    'slope': pd.Series(['example_value'], dtype='object'),
                                                    'ca': pd.Series(['example_value'], dtype='object'),
                                                    'thal': pd.Series(['example_value'], dtype='object')}))
    
    input_sample = StandardPythonParameterType({'data': data_sample})
    result_sample = NumpyParameterType(np.array([0]))
    output_sample = StandardPythonParameterType({'Results':result_sample})
    
    def init():
        global model
        # Example when the model is a file
        model_path = Model.get_model_path('log') # logistic
        print('Model Path is  ', model_path)
        model = joblib.load(model_path)
    
    @input_schema('Inputs', input_sample)
    @output_schema(output_sample)
    def run(Inputs):
        try:
            data = Inputs['data']
            result = model.predict_proba(data)
            return result.tolist()
        except Exception as e:
            error = str(e)
            return error
    
    1. In the deployment step I adjusted the CondaDependencies:
    # to install required packages
    env = Environment('env')
    cd = CondaDependencies.create(pip_packages=['pandas==1.1.5', 'azureml-defaults','joblib==0.17.0', 'inference-schema==1.3.0'], conda_packages = ['scikit-learn==0.22.2.post1'])
    env.python.conda_dependencies = cd
    # Register environment to re-use later
    env.register(workspace = ws)
    print('Registered Environment')
    

    as

    a) It is necessary to include inference-schema in the Dependencies file b) I downgraded scikit-learn to scikit-learn==0.22.2.post1 version because of this issue

    Now, when I feed the model with new data:

    new_data = {
      "Inputs": {
        "data": [
          {
            "age": 71,
            "sex": "0",
            "cp": "0",
            "trestbps": 112,
            "chol": 203,
            "fbs": "0",
            "restecg": "1",
            "thalach": 185,
            "exang": "0",
            "oldpeak": 0.1,
            "slope": "2",
            "ca": "0",
            "thal": "2"
          }
        ]
      }
    }
    

    And use it for prediction:

    import json
    import requests
    data = new_data
    headers = {'Content-Type':'application/json'}
    r = requests.post(url, str.encode(json.dumps(data)), headers = headers)
    print(r.status_code)
    print(r.json())
    

    I get:

    200 [[0.02325369841858338, 0.9767463015814166]]

    Uff! Maybe someone will benefit from my painful learning path! :)