Search code examples
pythonmultiprocessingpython-multiprocessingpython-multithreading

Run a for loop parallelly in batches in Python


I have this code which runs for a long time because it processes about 6000 entries. I am trying to run it parallelly so it takes less time. The multiprocessing code that I wrote is not working as expected. For loop in the function lambda_handler is what needs to parallelized. I am still new to python, so any help will be appreciated.

Here's my code

import json
import boto3
from pprint import pprint
import os
import botocore
from datetime import datetime
import csv
from botocore.client import Config
import logging
import urllib3
import sys
import time
from boto3.session import Session
from botocore.client import Config
import multiprocessing as mp

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


config = Config(connect_timeout=7200, 
                retries={'max_attempts': 5},
                proxies={'http': os.environ.get('HTTP_PROXY'),'https': os.environ.get('HTTPS_PROXY'),}
)


LOGGER=logging.getLogger()
LOGGER.setLevel(logging.INFO)


instance_id = os.environ.get('instance_id')
region_name = os.environ.get('Region')
s3_bucket = os.environ.get('s3_bucket')
env = os.environ.get('export_environment')

MaxResults = 4
# Multiprocessing
pool = mp.Pool(4)


def getUserList(instance_id: str, region_name: str, env: str):
    
    users_all = ''
    client = boto3.client('connect',region_name=region_name, verify=False, config=config)    
    
    
    #get list of Amazon Connect users - first batch
    response = client.list_users(
        InstanceId=instance_id,
        MaxResults=MaxResults
    )
    users=response['UserSummaryList']
    
    while "NextToken" in response:
        response = client.list_users(InstanceId=instance_id, MaxResults=MaxResults, NextToken=response["NextToken"])
        users.extend(response['UserSummaryList'])
    
    users_all = [*users_all,*users]
    
    return users_all

def lambda_handler(instance_id: str, region_name: str, env: str, s3_bucket):
    
    client = boto3.client('connect',region_name="us-east-1",verify=False, config=config)  
    
    #get list of Amazon Connect users
    users=getUserList(instance_id,region_name,env)
    
    # This loop needs to be parallelized
    for user in users:
            #time.sleep(0.1)
            user_id=user['Id']
            user_name=user['Username']
            response=client.describe_user(
                InstanceId=instance_id,
                UserId=user_id
            )
            user_firstname=response['User']['IdentityInfo']['FirstName']
            user_lastname=response['User']['IdentityInfo']['LastName']
            user_name=response['User']['Username']
            user_routing_profile_id=response['User']['RoutingProfileId']
            user_security_profile_ids=response['User']['SecurityProfileIds']
            #user_email=response['User']['IdentityInfo']['Email']
            user_phonetype=response['User']['PhoneConfig']['PhoneType']
            user_desk_number=response['User']['PhoneConfig']['DeskPhoneNumber']
            user_autoaccept=response['User']['PhoneConfig']['AutoAccept']
            user_acw_timeout=response['User']['PhoneConfig']['AfterContactWorkTimeLimit']
            try:
                user_hierarchy_group_id=response['User']['HierarchyGroupId']
            except KeyError as e:
                user_hierarchy_group_id=""
            
            
            security_profile_name_list=''
            
            hierarchy_group_name=""
                
            user_info=""+user_firstname+","+user_lastname+","+user_name+","+routing_profile_name+","+security_profile_name_list[1:]+","+user_phonetype+","+user_desk_number+","+str(user_autoaccept)+","+str(user_acw_timeout)+","+hierarchy_group_name+"\r\n"

            print(user_info) 

results = pool.starmap(lambda_handler, [(instance_id,region_name,env,s3_bucket)])
pool.close()

Solution

  • I would rearrange the code as follows:

    1. Create "worker" function process_user that will process a single user. if your pool size is 4 then 4 of users can be proccessed concurrently each by a process in the multiprocessing pool.
    2. process_user will now access what were previously arguments, i.e. instance_id, region_name, env ands3_bucket, as global variables, which have been defined in each pool process using the initializer and initargs arguments of the multiprocessing.pool.Pool.__init__ method. Likewise, a single global client instance that can be reused by process_user will be created for each pool process.

    This is the general idea, which obviously I cannot actually test. You need to study this and understand what is being done with the help of the Python docs so that you can make whatever changes are required to get this to run.

    import json
    import boto3
    from pprint import pprint
    import os
    import botocore
    from datetime import datetime
    import csv
    from botocore.client import Config
    import logging
    import urllib3
    import sys
    import time
    from boto3.session import Session
    from botocore.client import Config
    import multiprocessing as mp
    
    LOGGER=logging.getLogger()
    LOGGER.setLevel(logging.INFO)
    
    MaxResults = 4
    
    config = Config(connect_timeout=7200, 
                    retries={'max_attempts': 5},
                    proxies={'http': os.environ.get('HTTP_PROXY'),'https': os.environ.get('HTTPS_PROXY'),}
    )
    
    def init_pool_processes(*args):
        global instance_id, region_name, env, s3_bucket, client
        # Unpack:
        instance_id, region_name, env, s3_bucket = args
        
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        
        # One global client per process:
        client = boto3.client('connect',region_name="us-east-1",verify=False, config=config)  
    
    def process_user(user):
        #time.sleep(0.1)
        user_id=user['Id']
        user_name=user['Username']
        response=client.describe_user(
            InstanceId=instance_id,
            UserId=user_id
        )
        user_firstname=response['User']['IdentityInfo']['FirstName']
        user_lastname=response['User']['IdentityInfo']['LastName']
        user_name=response['User']['Username']
        user_routing_profile_id=response['User']['RoutingProfileId']
        user_security_profile_ids=response['User']['SecurityProfileIds']
        #user_email=response['User']['IdentityInfo']['Email']
        user_phonetype=response['User']['PhoneConfig']['PhoneType']
        user_desk_number=response['User']['PhoneConfig']['DeskPhoneNumber']
        user_autoaccept=response['User']['PhoneConfig']['AutoAccept']
        user_acw_timeout=response['User']['PhoneConfig']['AfterContactWorkTimeLimit']
        try:
            user_hierarchy_group_id=response['User']['HierarchyGroupId']
        except KeyError as e:
            user_hierarchy_group_id=""
          
        security_profile_name_list=''
        
        hierarchy_group_name=""
            
        user_info=""+user_firstname+","+user_lastname+","+user_name+","+routing_profile_name+","+security_profile_name_list[1:]+","+user_phonetype+","+user_desk_number+","+str(user_autoaccept)+","+str(user_acw_timeout)+","+hierarchy_group_name+"\r\n"
    
        print(user_info) 
    
    def getUserList(instance_id: str, region_name: str, env: str):
        
        users_all = ''
        client = boto3.client('connect',region_name=region_name, verify=False, config=config)    
        
        
        #get list of Amazon Connect users - first batch
        response = client.list_users(
            InstanceId=instance_id,
            MaxResults=MaxResults
        )
        users=response['UserSummaryList']
        
        while "NextToken" in response:
            response = client.list_users(InstanceId=instance_id, MaxResults=MaxResults, NextToken=response["NextToken"])
            users.extend(response['UserSummaryList'])
        
        users_all = [*users_all,*users]
        
        return users_all
    
    def lambda_handler(instance_id: str, region_name: str, env: str, s3_bucket):
        #get list of Amazon Connect users
        users=getUserList(instance_id,region_name,env)
        
        pool = mp.Pool(4, initializer=init_pool_processes, initargs=(instance_id, region_name, env, s3_bucket))
        pool.map(process_user, users)
        pool.close()
        pool.join()
    
    if __name__ == '__main__':
        instance_id = os.environ.get('instance_id')
        region_name = os.environ.get('Region')
        s3_bucket = os.environ.get('s3_bucket')
        env = os.environ.get('export_environment')
    
        lambda_handler(instance_id,region_name,env,s3_bucket)
    

    You also need to look at:

        users_all = ''
        ... # code elided
        users = response['UserSummaryList']
        ... # code elided
        users_all = [*users_all, *users]
        return users_all
    

    Isn't the above equivalent to the following?

    
        users = response['UserSummaryList']
        ... # code elided
        return users