I have this code which runs for a long time because it processes about 6000 entries. I am trying to run it parallelly so it takes less time. The multiprocessing code that I wrote is not working as expected. For loop in the function lambda_handler is what needs to parallelized. I am still new to python, so any help will be appreciated.
Here's my code
import json
import boto3
from pprint import pprint
import os
import botocore
from datetime import datetime
import csv
from botocore.client import Config
import logging
import urllib3
import sys
import time
from boto3.session import Session
from botocore.client import Config
import multiprocessing as mp
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
config = Config(connect_timeout=7200,
retries={'max_attempts': 5},
proxies={'http': os.environ.get('HTTP_PROXY'),'https': os.environ.get('HTTPS_PROXY'),}
)
LOGGER=logging.getLogger()
LOGGER.setLevel(logging.INFO)
instance_id = os.environ.get('instance_id')
region_name = os.environ.get('Region')
s3_bucket = os.environ.get('s3_bucket')
env = os.environ.get('export_environment')
MaxResults = 4
# Multiprocessing
pool = mp.Pool(4)
def getUserList(instance_id: str, region_name: str, env: str):
users_all = ''
client = boto3.client('connect',region_name=region_name, verify=False, config=config)
#get list of Amazon Connect users - first batch
response = client.list_users(
InstanceId=instance_id,
MaxResults=MaxResults
)
users=response['UserSummaryList']
while "NextToken" in response:
response = client.list_users(InstanceId=instance_id, MaxResults=MaxResults, NextToken=response["NextToken"])
users.extend(response['UserSummaryList'])
users_all = [*users_all,*users]
return users_all
def lambda_handler(instance_id: str, region_name: str, env: str, s3_bucket):
client = boto3.client('connect',region_name="us-east-1",verify=False, config=config)
#get list of Amazon Connect users
users=getUserList(instance_id,region_name,env)
# This loop needs to be parallelized
for user in users:
#time.sleep(0.1)
user_id=user['Id']
user_name=user['Username']
response=client.describe_user(
InstanceId=instance_id,
UserId=user_id
)
user_firstname=response['User']['IdentityInfo']['FirstName']
user_lastname=response['User']['IdentityInfo']['LastName']
user_name=response['User']['Username']
user_routing_profile_id=response['User']['RoutingProfileId']
user_security_profile_ids=response['User']['SecurityProfileIds']
#user_email=response['User']['IdentityInfo']['Email']
user_phonetype=response['User']['PhoneConfig']['PhoneType']
user_desk_number=response['User']['PhoneConfig']['DeskPhoneNumber']
user_autoaccept=response['User']['PhoneConfig']['AutoAccept']
user_acw_timeout=response['User']['PhoneConfig']['AfterContactWorkTimeLimit']
try:
user_hierarchy_group_id=response['User']['HierarchyGroupId']
except KeyError as e:
user_hierarchy_group_id=""
security_profile_name_list=''
hierarchy_group_name=""
user_info=""+user_firstname+","+user_lastname+","+user_name+","+routing_profile_name+","+security_profile_name_list[1:]+","+user_phonetype+","+user_desk_number+","+str(user_autoaccept)+","+str(user_acw_timeout)+","+hierarchy_group_name+"\r\n"
print(user_info)
results = pool.starmap(lambda_handler, [(instance_id,region_name,env,s3_bucket)])
pool.close()
I would rearrange the code as follows:
process_user
that will process a single user. if your pool size is 4 then 4 of users can be proccessed concurrently each by a process in the multiprocessing pool.process_user
will now access what were previously arguments, i.e. instance_id
, region_name
, env
ands3_bucket
, as global variables, which have been defined in each pool process using the initializer and initargs arguments of the multiprocessing.pool.Pool.__init__
method. Likewise, a single global client
instance that can be reused by process_user
will be created for each pool process.This is the general idea, which obviously I cannot actually test. You need to study this and understand what is being done with the help of the Python docs so that you can make whatever changes are required to get this to run.
import json
import boto3
from pprint import pprint
import os
import botocore
from datetime import datetime
import csv
from botocore.client import Config
import logging
import urllib3
import sys
import time
from boto3.session import Session
from botocore.client import Config
import multiprocessing as mp
LOGGER=logging.getLogger()
LOGGER.setLevel(logging.INFO)
MaxResults = 4
config = Config(connect_timeout=7200,
retries={'max_attempts': 5},
proxies={'http': os.environ.get('HTTP_PROXY'),'https': os.environ.get('HTTPS_PROXY'),}
)
def init_pool_processes(*args):
global instance_id, region_name, env, s3_bucket, client
# Unpack:
instance_id, region_name, env, s3_bucket = args
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# One global client per process:
client = boto3.client('connect',region_name="us-east-1",verify=False, config=config)
def process_user(user):
#time.sleep(0.1)
user_id=user['Id']
user_name=user['Username']
response=client.describe_user(
InstanceId=instance_id,
UserId=user_id
)
user_firstname=response['User']['IdentityInfo']['FirstName']
user_lastname=response['User']['IdentityInfo']['LastName']
user_name=response['User']['Username']
user_routing_profile_id=response['User']['RoutingProfileId']
user_security_profile_ids=response['User']['SecurityProfileIds']
#user_email=response['User']['IdentityInfo']['Email']
user_phonetype=response['User']['PhoneConfig']['PhoneType']
user_desk_number=response['User']['PhoneConfig']['DeskPhoneNumber']
user_autoaccept=response['User']['PhoneConfig']['AutoAccept']
user_acw_timeout=response['User']['PhoneConfig']['AfterContactWorkTimeLimit']
try:
user_hierarchy_group_id=response['User']['HierarchyGroupId']
except KeyError as e:
user_hierarchy_group_id=""
security_profile_name_list=''
hierarchy_group_name=""
user_info=""+user_firstname+","+user_lastname+","+user_name+","+routing_profile_name+","+security_profile_name_list[1:]+","+user_phonetype+","+user_desk_number+","+str(user_autoaccept)+","+str(user_acw_timeout)+","+hierarchy_group_name+"\r\n"
print(user_info)
def getUserList(instance_id: str, region_name: str, env: str):
users_all = ''
client = boto3.client('connect',region_name=region_name, verify=False, config=config)
#get list of Amazon Connect users - first batch
response = client.list_users(
InstanceId=instance_id,
MaxResults=MaxResults
)
users=response['UserSummaryList']
while "NextToken" in response:
response = client.list_users(InstanceId=instance_id, MaxResults=MaxResults, NextToken=response["NextToken"])
users.extend(response['UserSummaryList'])
users_all = [*users_all,*users]
return users_all
def lambda_handler(instance_id: str, region_name: str, env: str, s3_bucket):
#get list of Amazon Connect users
users=getUserList(instance_id,region_name,env)
pool = mp.Pool(4, initializer=init_pool_processes, initargs=(instance_id, region_name, env, s3_bucket))
pool.map(process_user, users)
pool.close()
pool.join()
if __name__ == '__main__':
instance_id = os.environ.get('instance_id')
region_name = os.environ.get('Region')
s3_bucket = os.environ.get('s3_bucket')
env = os.environ.get('export_environment')
lambda_handler(instance_id,region_name,env,s3_bucket)
You also need to look at:
users_all = ''
... # code elided
users = response['UserSummaryList']
... # code elided
users_all = [*users_all, *users]
return users_all
Isn't the above equivalent to the following?
users = response['UserSummaryList']
... # code elided
return users