Search code examples
pythonazureazure-functionsazure-functions-runtimeazure-function-async

How to transform a http trigger Azure Function into Azure durable Function in Python?


I'm stuck with transforming an Azure HTTP triggered function into something more robust that can take more than 230 seconds.

I struggle with dividing the code into functions, not sure how to construct the activity, orchestrator and client function in my case. I would really appreciate some help here.

The google_search module is defined as below:

from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging

def calculate_score(link, term):
    if term and term in link:
        return 100
    elif 'xxx' in link and 'yyy' in link:
        return 75
    elif 'xxx' in link:
        return 50
    elif link:
        return 25
    else:
        return None

def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
    results = []
    error_values = {key: 'Error' for key in ['urls', 'score']}
    success = True
    error_code = 0
    for term in tqdm(search_terms):
        try:
            if term is None:
                row = {
                    'search_item': term,
                    'urls': [],
                    'score': []
                }
            else:
                result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
                items = result.get('items', [])
                top_results = [item.get('link') for item in items[:num_results]]
                scores = [calculate_score(link, term) for link in top_results]

                row = {
                    'search_item': term,
                    'urls': top_results,
                    'score': scores
                }
                logging.info('Search completed successfully')
        except Exception as e:
            success = False
            error_code = 74
            row = {'search_item': term, 
                   **error_values}
            logging.error(f'An error occurred during calling the Search function. {e}')

        results.append(row)

    return success, results, error_code

The init.py function:

import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os

def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        logging.info('Python HTTP trigger function processed a request.')
        api_key = os.getenv('apikey')
        cse_id = os.getenv('cseid')
        req_body = req.get_json()
        search_terms = req_body.get('search_terms')
        num_results = int(req_body.get('num_results', 5))
        country_code = req_body.get('country_code', 'uk')
        params = req_body.get('params', {})

        if not search_terms or not api_key or not cse_id:
            logging.error('Missing required parameters')
            return func.HttpResponse('Missing required parameters', status_code=400)

        success, results, error_code = search(search_terms=search_terms,
                                              num_results=num_results,
                                              country_code=country_code,
                                              api_key=api_key,
                                              cse_id=cse_id)

        response_data = {
            'success': int(success),
            'error_code': int(error_code),
            **params,
            'results': results
        }
        response_json = json.dumps(response_data)

        logging.info('API Call completed successfully')
        return func.HttpResponse(response_json, mimetype='application/json')
    
    except Exception as e:
        logging.error(f'An error occurred: {str(e)}')
        error_code = 66
        response_data = {
            'success': 0,
            'error_code': int(error_code),
            **params
        }
        response_json = json.dumps(response_data)
        return func.HttpResponse(response_json, status_code=500, mimetype='application/json')

And a sample request:

{
  "search_terms": ["term1", "term2", "term3"],
  "num_results": 3,
  "params": {
    "search_id": "123",
    "engine_name": "Google Search"}   
}

Desired output example:

{
    "success": 1,
    "error_code": 0,
    "search_id": "123",
    "engine_name": "Google Search",
    "results": [
        {
            "search_item": "term1",
            "urls": [
                "https://sampleresult3.com",
                "https://sampleresult2.com",
                "https://sampleresult3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        },
        {
            "search_item": "term2",
            "urls": [
                "https://whatever1.com",
                "https://whatever.2.com",
                "https://whatever3.com"
            ],
            "score": [
                25,
                25,
                75
            ]
        },
        {
            "search_item": "term3",
            "urls": [
                "https://www.link1.com",
                "https://link2.com",
                "https://www.link3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        }
    ]
}

EDIT

I tried with the below activity function:

from google_search_scoring_clean import search
import os

def main(search_terms, num_results, country_code):
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    if not search_terms or not api_key or not cse_id:
        return False, []

    success, results = search(search_terms=search_terms,
                              num_results=num_results,
                              country_code=country_code,
                              api_key=api_key,
                              cse_id=cse_id)

    return success, results

but received an error messege: Result: Failure Exception: FunctionLoadError: cannot load the ActivityFunction function: the following parameters are declared in Python but not in function.json: {'country_code', 'search_terms', 'num_results'}

After editing the function.json to

{
  "bindings": [
    {
      "name": "search_terms",
      "type": "string[]",
      "direction": "in"
    },
    {
      "name": "num_results",
      "type": "int",
      "direction": "in"
    },
    {
      "name": "country_code",
      "type": "string",
      "direction": "in"
    }
  ]
}

however, I receive:

The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.

EDIT2:

The below also won't work:

import os
from googleapiclient import discovery
import logging

def main(searchTerm: str) -> str:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if searchTerm is None:
            results = {
                'search_term': searchTerm,
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': searchTerm,
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': searchTerm, **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

I adjusted the name from 'name' to 'searchTerm' in the function.json. The output is:

{
    "name": "Orchestrator",
    "instanceId": "4de8cc4818554208ad599e8687ca77a7",
    "runtimeStatus": "Running",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": null,
    "createdTime": "2023-05-31T10:37:24Z",
    "lastUpdatedTime": "2023-05-31T10:37:24Z"
}

EDIT3: It worked with the following adjustments

  1. In function.json of Activity Function I changed 'name' to activityVar - somehow it does not accept activity_var name, have no idea why
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "activityVar",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

Orchestrator function:

import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("ActivityFunction", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

Where my activity function folder is named "ActivityFunction".

Activity Function for now, as I have to prettify it:

import os
from googleapiclient import discovery
import logging

def main(activityVar: dict) -> dict:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if activityVar['search_term'] is None:
            results = {
                'search_term': activityVar['search_term'],
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': activityVar['search_term'],
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': activityVar['search_term'], **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

God, it's been a long day. Gotta wrap my head around it once again.


Solution

  • The pattern you need to follow is the fan out fan in pattern. I won't be writing the full code for you but you can follow the example given here. My response below should guide you to write the code needed by you.

    The aim is to split the search terms list into separate variables so you can trigger multiple activity functions and each of them can do a search for a single variable independently. Since these activity functions are not http triggered functions they can go beyond the 230s limit.

    Your http triggered function will look like this. It needs to pass the request body into the orchestrator so you can split the search terms up there before calling the activity functions.

    async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
        client = df.DurableOrchestrationClient(starter)
        requestBody = json.loads(req.get_body().decode())
        instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)
    
        logging.info(f"Started orchestration with ID = '{instance_id}'.")
        return client.create_check_status_response(req, instance_id)
    

    Your Orchestrator will now recreate the body as a dictionary and pass that as a variable to the activity functions. Only difference is, each activity function will receive only 1 search term. You will get back a list in results which you can format to what you need before returning back a response.

    def orchestrator_function(context: df.DurableOrchestrationContext):
        requestBody = context.get_input()
        search_terms= (requestBody['search_terms'])
        print("Orchestrator " + str(search_terms))
        tasks = []
        for search_term in search_terms:
            activity_var = {}
            activity_var['search_term'] = search_term
            activity_var['num_results'] = requestBody['num_results']
            activity_var['params'] = requestBody['params']
            print(activity_var)
            tasks.append(context.call_activity("Activity", activity_var))
    
        results = yield context.task_all(tasks)
        return results
    
    main = df.Orchestrator.create(orchestrator_function)
    

    Finally your activity function will hold the main logic to do the search and return back results for a single search term.

    1 important point to remember is that since this entire process is asynchronous, when you call the http starter function, you will immediately get back a dictionary of links while the actual process runs in the background. You will need to implement some kind of polling on the "statusQueryGetUri" link in fixed or exponential backoff intervals to get a status of the execution. Once the result is set to "Completed" you will find your result in the "output" variable.
    Below is an example of calling the "statusQueryGetUri" link.

    {
        "name": "Orchestrator1",
        "instanceId": "1a98f11135494cf88fa1d3241b8cc4f3",
        "runtimeStatus": "Completed",
        "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
        "customStatus": null,
        "output": [
            "Hello {'search_term': 'term1', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
            "Hello {'search_term': 'term2', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
            "Hello {'search_term': 'term3', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!"
        ],
        "createdTime": "2023-05-30T12:35:22Z",
        "lastUpdatedTime": "2023-05-30T12:35:24Z"
    }