I am trying to implement data analysis by function using Durable Functions.
When implementing by function, I need to transfer data frames, etc., so I am trying to exchange data by making it pickle. However, when I implement the code as shown below, I get an error like the one in the picture and Vscode gets stuck.
What is the cause and how can I fix it?
Below is a part of the code I want to implement.
import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing # Dataset
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
instance_id = await client.start_new("orchestrator", None, {})
await client.wait_for_completion_or_create_check_status_response(req, instance_id)
return client.create_check_status_response(req, instance_id)
### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
data = yield context.call_activity("prepare_data", '')
simple = yield context.call_activity("simple_regression", {"data": data})
multiple = yield context.call_activity("multiple_regression", {"data": data})
return "finished"
### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
# prepare data
california_housing = fetch_california_housing()
exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names) # 説明変数
tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # 目的変数
data = pd.concat([exp_data, tar_data], axis=1) # データを結合
# Delete anomalous values
data = data[data['HouseAge'] != 52]
data = data[data['HousingPrices'] != 5.00001]
# Create useful variables
data['Household'] = data['Population']/data['AveOccup']
data['AllRooms'] = data['AveRooms']*data['Household']
data['AllBedrms'] = data['AveBedrms']*data['Household']
data = pickle.dumps(data)
return data
### simple regression analysis ###
@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
data = pickle.loads(arg['data'])
exp_var = 'MedInc'
tar_var = 'HousingPrices'
# Remove outliers
q_95 = data[exp_var].quantile(0.95)
data = data[data[exp_var] < q_95]
# Split data into explanatory and objective variables
X = data[[exp_var]]
y = data[[tar_var]]
# learn
model = LinearRegression()
model.fit(X, y)
model = pickle.dumps(model)
return model
### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
data = pickle.loads(arg['data'])
exp_vars = ['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']
tar_var = 'HousingPrices'
# Remove outliers
for exp_var in exp_vars:
q_95 = data[exp_var].quantile(0.95)
data = data[data[exp_var] < q_95]
# Split data into explanatory and objective variables
X = data[exp_vars]
y = data[[tar_var]]
# Split into training and test data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
# Standardize X_train
scaler = StandardScaler()
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_train_scaled = pd.DataFrame(X_train_scaled, columns = exp_vars)
# learn
model = LinearRegression()
model.fit(X_train_scaled, y_train)
model = pickle.dumps(model)
X_train_scaled = pickle.dumps(X_train_scaled)
y_train = pickle.dumps(y_train)
X_test = pickle.dumps(X_test)
y_test = pickle.dumps(y_test)
scaler = pickle.dumps(scaler)
return model, X_train_scaled, y_train, X_test, y_test, scaler
In order to resolve your error, Convert your data to dictionary dict in the prepared_data and then run one sample simple_regression and multiple_regression in your Durable Function code, I have made some changes in your function_app.py code like below:-
My function_app.py:-
import pickle
import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing # Dataset
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
instance_id = await client.start_new("orchestrator", None, {})
await client.wait_for_completion_or_create_check_status_response(req, instance_id)
return client.create_check_status_response(req, instance_id)
### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
data = yield context.call_activity("prepare_data", '')
simple = yield context.call_activity("simple_regression", {"data": data})
multiple = yield context.call_activity("multiple_regression", {"data": data})
return "finished"
### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
# prepare data
california_housing = fetch_california_housing()
exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
print(exp_data.columns.tolist) # explanatory variables
tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # target variable
data = pd.concat([exp_data, tar_data], axis=1) # merge data
print("Column Names:")
print(data.columns.tolist())
# Delete anomalous values
data = data[data['HouseAge'] != 52]
data = data[data['HousingPrices'] != 5.00001]
# Create useful variables
data['Household'] = data['Population']/data['AveOccup']
data['AllRooms'] = data['AveRooms']*data['Household']
data['AllBedrms'] = data['AveBedrms']*data['Household']
# Ensure 'MedInc' column doesn't contain null or missing values
data = data.dropna(subset=['MedInc'])
# Create a dictionary to store multiple data items
prepared_data = {
'data': data.to_dict(),
'columns': data.columns.tolist(),
'target_column': 'HousingPrices',
'MedInc': data['MedInc'].tolist() # Add 'MedInc' column to the dictionary as a list
}
return prepared_data
@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Handling missing or NaN values
data.dropna(inplace=True)
# Selecting the explanatory variable 'MedInc' and target 'HousingPrices'
X_simple = data[['MedInc']]
y = data[arg['target_column']]
# Check lengths of X_simple and y
if len(X_simple) != len(y):
return "Lengths of X_simple and y do not match"
# Initialize and fit the linear regression model
simple_model = LinearRegression()
simple_model.fit(X_simple, y)
return {
'model': simple_model,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Handling missing or NaN values
data.dropna(inplace=True)
# Selecting multiple explanatory variables and target 'HousingPrices'
X_multiple = data.drop(columns=[arg['target_column']]) # Drop the target column
y = data[arg['target_column']]
# Check lengths of X_multiple and y
if len(X_multiple) != len(y):
return "Lengths of X_multiple and y do not match"
# Initialize and fit the multiple regression model
multiple_model = LinearRegression()
multiple_model.fit(X_multiple, y)
return {
'model': multiple_model,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
Output:-
All Orchestration and Activity Function ran successfully, Refer below:-