I implemented data analysis with Durable Functions so that it would be one function per instance.
Using VScode's Durable functions extension, the execution details are shown in the image.
The wait time between instances of "exploratory_data_analysis" to "decision_tree_regression" is short. However, the start of "random_forest" takes a considerable amount of time from the end of the previous function. This has resulted in a longer overall response time.
The long execution time of "random_forest" is unavoidable, but why does it take so long to start an instance? The data being passed is the same as "redge_regression" etc., so it should not be overhead caused by transfer data.
Also, is the time represented by this extension the time from instance startup to return code?
The random_forest
function is started at the end because, It is the last activity to run after all the other previous activities. The Azure Durable Function will run activities according to the sequence of your code. By what I understand, random_forest is the last activity to run in your Durable Function, Thus it is showing at last. And the Time at the horizontal column represents the Time at which your Function is created or started till the last updated time or completion time.
I tried performing data analysis as similar as yours and below are my results:-
Reference- My SO Answer
My function_app.py:-
Make sure you are distributing the load in your code properly by calling the data more efficiently:-
import pickle
import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing # Dataset
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
instance_id = await client.start_new("orchestrator", None, {})
await client.wait_for_completion_or_create_check_status_response(req, instance_id)
return client.create_check_status_response(req, instance_id)
### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
data = yield context.call_activity("prepare_data", '')
simple = yield context.call_activity("simple_regression", {"data": data})
multiple = yield context.call_activity("multiple_regression", {"data": data})
exploratory_data_analysis = yield context.call_activity("exploratory_data_analysis", {"data": data})
data_processing = yield context.call_activity("multivariate_linear_regression", {"data": data})
multivariate_linear_regression = yield context.call_activity("data_processing", {"data": data})
ridge_regression = yield context.call_activity("ridge_regression", {"data": data})
k_nearest_neighbor = yield context.call_activity("k_nearest_neighbor", {"data": data})
decision_tree_regression = yield context.call_activity("decision_tree_regression", {"data": data})
random_forest = yield context.call_activity("random_forest", {"data": data})
return "finished"
### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
# prepare data
california_housing = fetch_california_housing()
exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
print(exp_data.columns.tolist) # explanatory variables
tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # target variable
data = pd.concat([exp_data, tar_data], axis=1) # merge data
print("Column Names:")
print(data.columns.tolist())
# Delete anomalous values
data = data[data['HouseAge'] != 52]
data = data[data['HousingPrices'] != 5.00001]
# Create useful variables
data['Household'] = data['Population']/data['AveOccup']
data['AllRooms'] = data['AveRooms']*data['Household']
data['AllBedrms'] = data['AveBedrms']*data['Household']
# Ensure 'MedInc' column doesn't contain null or missing values
data = data.dropna(subset=['MedInc'])
# Create a dictionary to store multiple data items
prepared_data = {
'data': data.to_dict(),
'columns': data.columns.tolist(),
'target_column': 'HousingPrices',
'MedInc': data['MedInc'].tolist() # Add 'MedInc' column to the dictionary as a list
}
return prepared_data
@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Handling missing or NaN values
data.dropna(inplace=True)
# Selecting the explanatory variable 'MedInc' and target 'HousingPrices'
X_simple = data[['MedInc']]
y = data[arg['target_column']]
# Check lengths of X_simple and y
if len(X_simple) != len(y):
return "Lengths of X_simple and y do not match"
# Initialize and fit the linear regression model
simple_model = LinearRegression()
simple_model.fit(X_simple, y)
return {
'model': simple_model,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Handling missing or NaN values
data.dropna(inplace=True)
# Selecting multiple explanatory variables and target 'HousingPrices'
X_multiple = data.drop(columns=[arg['target_column']]) # Drop the target column
y = data[arg['target_column']]
# Check lengths of X_multiple and y
if len(X_multiple) != len(y):
return "Lengths of X_multiple and y do not match"
# Initialize and fit the multiple regression model
multiple_model = LinearRegression()
multiple_model.fit(X_multiple, y)
return {
'model': multiple_model,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
@app.activity_trigger(input_name="arg")
def exploratory_data_analysis(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Perform exploratory data analysis tasks here
# For example: summary statistics, visualizations, etc.
# Placeholder - perform some EDA tasks here
eda_result = "Exploratory Data Analysis performed"
return {
'result': eda_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### data processing ###
@app.activity_trigger(input_name="arg")
def data_processing(arg: dict):
try:
# Placeholder - Add data processing steps here
# This could involve data cleaning, normalization, etc.
processed_data_result = "Data processing completed"
return {
'result': processed_data_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
@app.activity_trigger(input_name="arg")
def multivariate_linear_regression(arg: dict):
try:
# Placeholder - Implement Multivariate Linear Regression here
# Use the processed data from 'arg' dictionary
mlr_result = "Multivariate Linear Regression completed"
return {
'result': mlr_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### ridge regression ###
@app.activity_trigger(input_name="arg")
def ridge_regression(arg: dict):
try:
# Placeholder - Implement Ridge Regression here
ridge_result = "Ridge Regression completed"
return {
'result': ridge_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### k-nearest neighbor ###
@app.activity_trigger(input_name="arg")
def k_nearest_neighbor(arg: dict):
try:
# Placeholder - Implement K-Nearest Neighbor here
knn_result = "K-Nearest Neighbor completed"
return {
'result': knn_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### decision tree regression ###
@app.activity_trigger(input_name="arg")
def decision_tree_regression(arg: dict):
try:
# Placeholder - Implement Decision Tree Regression here
dt_result = "Decision Tree Regression completed"
return {
'result': dt_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### random forest ###
@app.activity_trigger(input_name="arg")
def random_forest(arg: dict):
try:
# Placeholder - Implement Random Forest here
rf_result = "Random Forest completed"
return {
'result': rf_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
In your scenario, The random_forest is taking a long time to start after previous activities are completed is also because of the heavy processing of the Function. If you are running this Function locally or in Function App, It will run the Function with the default Azure Storage account mentioned in the AzureWebJobsStorage
connection string, If you want to run durable Function more efficiently you can add additional storage account for Durable Function like below:-
My local.settings.json
with additional MyStorageAccountAppSetting
:-
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=vsiddheshrg989173;AccountKey=xxxxxStOyfWOw==;EndpointSuffix=core.windows.net",
"MyStorageAccountAppSetting" : "DefaultEndpointsProtocol=https;AccountName=silicon1;AccountKey=xxxxxaBKbO/PA+AStKWFvaQ==;EndpointSuffix=core.windows.net",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
}
}
You need to apply same settings in your Function App Enviroment variables.
My host.json
containing MyStorageAccountAppSetting
:-
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.*, 4.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"connectionStringName": "MyStorageAccountAppSetting"
}
}
}
}
You can also add concurrency limit setting for your Activity Functions to increase the parallel execution of the Function and partition count mechanism by specifying partition count in host.json file like:-
host.json:-
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.*, 4.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"connectionStringName": "MyStorageAccountAppSetting",
"partitionCount": 3
},
"maxConcurrentActivityFunctions": 10,
"maxConcurrentOrchestratorFunctions": 10
}
}
}
The horizontal time is the createdTime UTC and lastUpdatedTime UTC:-
My random_forest activity is triggering properly with the below settings in my host.json:-
I have referred these two Github Documents for host,json setting and refer these to get more insights on impoving performance of Azure Durable Functions:-