Search code examples
pythonjsonamazon-web-servicesamazon-sqs

how to extract from sqs json msg nested values in python?


my aws chain: lambda->sns->sqs->python script on ec2

the python script gives me error due to unable to extract values i need , wrong structure i think but i cant see the bug.

I can't get the values from "vote" field in sqs msg. How to do that?

The test event structure(raw msg delivery enabled):

{
  "body": {
    "MessageAttributes": {
      "vote": {
        "Type": "Number",
        "Value": "90"
      },
      "voter": {
        "Type": "String",
        "Value": "default_voter"
      }
    }
  }
}

the python script that processes sqs msg:

#!/usr/bin/env python3

import boto3
import json
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

queue = boto3.resource('sqs', region_name='us-east-1').get_queue_by_name(QueueName="erjan")
table = boto3.resource('dynamodb', region_name='us-east-1').Table('Votes')

def process_message(message):
    try:
        payload = json.loads(message.body) #unable to parse sqs json msg here
        #payload = message.body
        #payload = message['body']
        voter = payload['MessageAttributes']['voter']['Value'] #here the exception raised!
        vote  = payload['MessageAttributes']['vote']['Value']
        logging.info("Voter: %s, Vote: %s", voter, vote)
        store_vote(voter, vote)
        update_count(vote)
        message.delete()
    except Exception as e:
        print('x = msg.body')
        x = (message.body)
        print(x)
        print('-------')
        print('message.body')
        print(message.body)

        try:
            vote = x['MessageAttributes']['vote']['Value']
            logging.error("Failed to process message")
            logging.error('------- here: ' + str(e))
            logging.error('vote %d' % vote)
        except TypeError:
            logging.error("error catched")

def store_vote(voter, vote):
    try:
        logging.info('table put item.......')
        print('table put item......')
        response = table.put_item(
           Item={'voter': voter, 'vote': vote}
        )
    except:
        logging.error("Failed to store message")
        raise

def update_count(vote):
    logging.info('update count....')
    print('update count....')
    table.update_item(
        Key={'voter': 'count'},
        UpdateExpression="set #vote = #vote + :incr",
            ExpressionAttributeNames={'#vote': vote},
            ExpressionAttributeValues={':incr': 1}
    )

if __name__ == "__main__":
    while True:
        try:
            messages = queue.receive_messages()
        except KeyboardInterrupt:
           logging.info("Stopping...")
           break
        except:
            logging.error(sys.exc_info()[0])
            logging.info('here error - we continue')
            continue
        for message in messages:
            process_message(message)

the error msg i get:

    payload = json.loads(message)
  File "/usr/lib64/python3.7/json/__init__.py", line 341, in loads
    raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not sqs.Message

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./processor.py", line 90, in <module>
    process_message(message)
  File "./processor.py", line 40, in process_message
    vote = x['MessageAttributes']['vote']['Value']
TypeError: string indices must be integers

I understand its sqs message and has diff structure than json. I tried

json.loads(message.body)

but sqs msg is generated with empty msg body. even though the test event above has "body" field its not the same. when i just debug and print(message)

it gives this output:

sqs.Message(queue_url='https://sqs.us-east-1.amazonaws.com/025416187662/erjan', receipt_handle='AQEBAz3JiGRwss1ROOr2R8GkpBWwr7tJ1tUDUa7JurX7H6SxoF6gyj7YOxoLuU1/KcHpBIowon12Vle97mJ/cZFUIjzJon78zIIcVSVLrZbKPBABztUeE/Db0ALCMncVXpHWXk76hZVLCC+LHMsi8E5TveZ7+DbTdyDX
U6djTI1VcKpUjEoKLV9seN6JIEZV35r3fgbipHsX897IqTVvjhb0YADt6vTxYQTM1kVMEPBo5oNdTWqn6PfmoYJfZbT1GHMqphTluEwVuqBzux2kPSMtluFk3yk4XXwPJS304URJ7srMksUdoVTemA56OsksVZzXT4AcS8sm8Y3SO2PLLjZSV+7Vdc6JZlX7gslvVSADBlXw5BJCP/Rb9mA2xI9FOyW4')

i think the auto generated sqs msg has it hidden somewhere


Solution

  • the solution came from this example - at the end it shows how to receive and process message - https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html

    in main, i think the queue was empty because i did not specify the var 'messages' attribute names when retrieving it from queue.

    if __name__ == "__main__":
        while True:
            try:
                messages = queue.receive_messages() #this was empty
                messages = queue.receive_messages(MessageAttributeNames=['vote','voter']) #this is the solution - we have to specify the msg attr names
            except KeyboardInterrupt:
               logging.info("Stopping...")
               break
            except:
                logging.error(sys.exc_info()[0])
                continue
            for message in messages:
                process_message(message)
    

    since now in debugging it does show the msg attributes:

    def process_message(message):
        try:
            payload = json.loads(message.body)
            print(type(payload))
            print(payload)
            print('-------------------MSG ATTR----------------------')
            print(message.message_attributes) #shows actual values!