Search code examples
pythonmockingamazon-sqsmoto

Python AWS SQS mocking with MOTO


I am trying to mock an AWS SQS with moto, below is my code

from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs


@mock_sqs
def test_get_all_msg_from_queue():
    
    #from myClass import get_msg_from_sqs
    
    conn = boto3.client('sqs', region_name='us-east-1')
    
    queue = conn.create_queue(QueueName='Test')
    
    os.environ["SQS_URL"] = queue["QueueUrl"]
    
    queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
    
    
    #Tried this as well
    #conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))

    resp = get_msg_from_sqs(queue["QueueUrl"]) 

    assert resp is not None

While executing this I am getting the following error

>       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E       AttributeError: 'dict' object has no attribute 'send_message'

If I try another way of to send a message in SQS(see commented out code #Tried this as well) then at the time of actual SQS calling in my method get_msg_from_sqs, I am getting the below error

E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation: 
The address https://queue.amazonaws.com/ is not valid for this endpoint.

I am running it on win10 with PyCharm and the moto version is set to

moto = "^2.2.6"

My code is given below

sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
    return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
               MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)

What am I missing over here?


Solution

  • Your queue variable is a dict returned by create_queue:

    queue = conn.create_queue(QueueName='Test')
    

    It is not a queue and thus you cannot call sendMessage on it.

    To do that, you need to create a queue object:

    conn = boto3.client('sqs')
    sqs = boto3.resource('sqs')
    response = conn.create_queue(QueueName='Test')
    queue_url = response["QueueUrl"]
    queue = sqs.Queue(queue_url)
    
    queue.send_message()