Search code examples
pythonamazon-web-servicesamazon-s3boto3amazon-sqs

Listing all objects in a bucket and sending the notification from all of them to AWS SQS using boto3


I want to generate a message for each object or key in my s3 bucket. I do have like ten keys. That is why I want to list all of them with list_objects_v2 and then pass them to SQS queue. Below, there is a sample of code I tried to use:

import json
import boto3

region = "us-east-2"
bucket = "s3-small-files-fiap"
prefix = 'folder/'

s3_client = boto3.client('s3', region_name=region)

response = s3_client.list_objects_v2(Bucket=bucket,
                                     Prefix=prefix)

settings = {
    "bucket_name": "s3-small-files-fiap",
    "queue_name": "sqs-csv-to-json",
    "region": region,
    "account_number": <my_account_number>
}

bucket_notifications_configuration = {
    'QueueConfigurations': [{
        'Events': ['s3:ObjectCreated:*'],
        'Id': 'Notifications',
        'QueueArn':
        'arn:aws:sqs:{region}:{account_number}:{queue_name}'.format(**settings)
    }]
}

qpolicy = {
    "Version": "2012-10-17",
    "Id":
    "arn:aws:sqs:{region}:{account_number}:{queue_name}/SQSDefaultPolicy".format(
        **settings),
    "Statement": [{
        "Sid": "allow tmp bucket to notify",
        "Effect": "Allow",
        "Principal": {"AWS": "*"},
        "Action": "SQS:SendMessage",
        "Resource": "arn:aws:sqs:{region}:{account_number}:{queue_name}".format(
            **settings),
        "Condition": {
            "ArnLike": {
                "aws:SourceArn": "arn:aws:s3:*:*:{bucket_name}".format(
                    **settings)
            }
        }
    }]
}
print("Bucket notify", bucket_notifications_configuration)
print("Queue Policy", qpolicy)

queue_attrs = {"Policy": json.dumps(qpolicy), }

sqs_client = boto3.resource("sqs",
                   region_name=region).get_queue_by_name(
                       QueueName=settings["queue_name"])
sqs_client.set_attributes(Attributes=queue_attrs)
sqs_client.attributes

s3_client.put_bucket_notification_configuration(
    Bucket=bucket,
    NotificationConfiguration=bucket_notifications_configuration)

For some reason, its output generates just one message of notification as follows. How can I send notifications ten times instead of one using the code above?

Here is the example of the output:

Bucket notify {'QueueConfigurations': [{'Events': ['s3:ObjectCreated:*'], 'Id': 'Notifications', 'QueueArn': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json'}]}
Queue Policy {'Version': '2012-10-17', 'Id': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json/SQSDefaultPolicy', 'Statement': [{'Sid': 'allow tmp bucket to notify', 'Effect': 'Allow', 'Principal': {'AWS': '*'}, 'Action': 'SQS:SendMessage', 'Resource': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json', 'Condition': {'ArnLike': {'aws:SourceArn': 'arn:aws:s3:*:*:s3-small-files-fiap'}}}]}

Solution

  • Create a method and move your SNS send code

                QUEUE_NAME = os.getenv("QUEUE_NAME")
            SQS = boto3.client("sqs")
    
            ## Inside handler method 
    
             s3_resource = boto3.resource('s3')
             response = s3_client.list_objects_v2(Bucket=bucket,Prefix=prefix)
    
    
             for file in response:
                # call SQS send method here 
                try:
                    #logger.debug("Recording %s", file)
                    u = getQueueURL()
                    logging.debug("Got queue URL %s", u)
                    resp = SQS.send_message(QueueUrl=u, MessageBody=file)
                    #logger.debug("Send result: %s", resp)
                except Exception as e:
                    raise Exception("Raised Exception! %s" % e)
    
            def getQueueURL():
                """Retrieve the URL for the configured queue name"""
                q = SQS.get_queue_url(QueueName=QUEUE_NAME).get('QueueUrl')
                #logger.debug("Queue URL is %s", QUEUE_URL)
                return q