Search code examples
pythonaws-lambdamultiprocessingamazon-textract

AWS Textract asynchronous operations within multiprocessing


I am working in a Lambda function within AWS. I have two functions which asynchronously call on Textract to return the extracted text from an image. By switching to this asynchronous operation from a singular call one at a time (which must wait for the result to complete before submitting a new request), given the volume of images I need processed by Textract, I was able to reduce processing time for Textract from 8 minutes to about 3 minutes--a vast improvement.

But, I am looking into using multiprocessing to see if I can reduce the time down even further. However, it appears that multiprocessing.map and multiprocessing.starmap do not seem to work very well in AWS Lambda. I saw some recommendations for using multiprocessing.Process or multiprocessing.Pipe, but it isn't clear if that will actually make a big impact.

Based on my code below, will leveraging multiprocessing.Process or multiprocessing.Pipe make noticeable improvements in processing time or is it not worth the effort? If it is worth it, can anyone make any suggestions on how to actually implement this given my code? I am brand new to multiprocessing and there's a lot to wrap my head around, further complicated by trying to also implement in AWS.

def extract_text_async(img, loc):
    img_obj = Image.fromarray(img).convert('RGB')
    out_img_obj = io.BytesIO()
    img_obj.save(out_img_obj, format="png")
    out_img_obj.seek(0)
    file_name = key_id + "_" + loc + ".png"
    s3.Bucket(bucket_name).put_object(Key=file_name, Body=out_img_obj, ContentType="image/png")
    response = textract_client.start_document_text_detection(DocumentLocation={'S3Object':{'Bucket': bucket_name,'Name': file_name}},JobTag=key_id + loc, NotificationChannel={'SNSTopicArn': snsarn,'RoleArn': rolearn},OutputConfig={'S3Bucket': output_bucket,'S3Prefix': str(datetime.now()).replace(" ", "_") + key_id + "_" + loc + "_textract_output"})
    return response['JobId']

def fetch_textract_async(jobid):
    response = textract_client.get_document_text_detection(JobId=jobid,MaxResults=1000)
    status = response['JobStatus']
    text_len = {}
    for y in range(len(response['Blocks'])):
        if 'Text' in response['Blocks'][y]:
            text_len[y] = len(response['Blocks'][y]['Text'])
        else:
            pass
    if bool(text_len):
        extracted_text = response['Blocks'][max(text_len, key=text_len.get)]['Text']
        if extracted_text == '-':
            extracted_text = ''
        else:
            pass
    else:
        extracted_text = ''
    return extracted_text

# example function calls
s1_1 = extract_text_async(cropped_images['Section 1']['1'],"s1_1")
s1_2 = extract_text_async(cropped_images['Section 1']['2'],"s1_2")
s1_3 = extract_text_async(cropped_images['Section 1']['3'],"s1_3")
s1_1_result = fetch_textract_async(s1_1)
s1_2_result = fetch_textract_async(s1_2)
s1_3_result = fetch_textract_async(s1_3)

Solution

  • In a well-architected, scalable setup for running Amazon Textract, the callback itself should be event-driven through SNS (which it looks from your snippet like you're already using?)... So your Lambdas will just be 1) kicking off jobs and 2) reading the results.

    If you're considering spiky workloads with very high concurrency (e.g. dump a large number of documents at once and process them as fast as possible), it's worth checking your applied quotas for e.g. StartDocumentTextDetection and GetDocumentTextDetection TPS, and max concurrent jobs. As mentioned on this page, smoothing out the workload is one good way to improve overall throughput.

    On the (1) job kick-off side:

    • It looks like your extract_text_async takes an image as a pixel array. I hope you're not passing that over network anywhere? (e.g. between Lambdas, or from local to cloud) Compressed image formats like PNG, JPEG, etc are much smaller, which can accelerate data transfer if you're e.g. collecting the image from a local camera and uploading it through your Lambda or something.
    • If you upload direct as PNG bytes you could also skip the Pillow dependency & conversion compute
    • You could potentially avoid streaming the image through Lambda at all, by having your client just upload the image direct to S3 and using the S3 event notifications Lambda integration to kick off a Lambda that requests the Textract job for the newly-created S3 URI. The client doesn't receive a notification of the Textract job ID in this case, but you could e.g. have your Lambda store the association from S3 URI to job ID in a DynamoDB table, so it's easily queryable as soon as the job has been started.
    • What actually triggers your job creation Lambda, and what size of batches of new documents it receives, will affect the scaling profile and whether multiprocessing is relevant:
      • If the Lambda is invoked for each individual image with no batching, is Lambda scaling out fast enough? Might provisioned concurrency help?
      • If the Lambda is invoked just once with very large batch sizes, maybe parallelizing the Textract API requests with multithreading could be helpful... But splitting the batches into separate invocations might help parallelize easier?
      • If the jobs are being kicked off very efficiently, maybe you'll hit your Textract TPS or MaxConcurrentJobs quotas and see throttling? Could consider tuning the boto3 retry settings to optimize performance, or exploring quota increase requests

    On the (2) job retrieval side:

    • The default SNS-to-Lambda integration is probably already doing some batching and invoking your function as concurrently as it has new data coming available: To understand how much multiprocessing might help, it's worth measuring what your typical batch size is for incoming SNS messages.
    • Remember there's also the quota limit on GetDocumentTextDetection TPS, which if you have a very large number of documents all complete around the same time might cause contention. If this seems to be an issue, then tuning the boto3 retry settings and connecting via SQS for concurrency control may help accelerate the workload.
    • You should probably be checking for NextToken and paginating your get_document_text_detection requests, in case you're going to process long documents... But this will only increase latency as your current solution seems to just be fetching the first page whether it's complete or not.

    In terms of staying under throttling quotas, AWS typically suggests retries and backoff rather than explicit quota management, because the former scales to distributed systems while the latter requires a central monitor of current consumption, with all the associated possible risks of deadlock and etc.


    In summary, focussing on multiprocessing might be a bit premature because it only addresses scaling within one running instance. It might be better to check whether the overall distributed architecture is well-optimized, including how those Lambdas get invoked and requests get batched.

    For more examples, I'd suggest to check out: