Search code examples
amazon-sqsamazon-snsmasstransit

MassTransit with SQS/SNS. Publish into SNS?


There are official examples of MassTransit with SQS. The "bus" is configured to use SQS (x.UsingAmazonSqs). The receive endpoint is an SQS which in turn subscribed to an SNS topic. However there is no example how to Publish into SNS.

  1. How to publish into SNS topic?
  2. How to configure SQS/SNS to use http, since I develop against localstack?

AWS sdk version:

var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };

Update:

After Chris's reference and experiments with configuration I came up with the following for the 'localstack' SQS/SNS. This configuration executes without errors and Worker gets called, and publishes a message to a bus. However consumer class is not triggered and doesn't seem messages end up in the queue (or rather topic).

public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.AddConsumer<MessageConsumer>();
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);

            h.EnableScopedTopics();
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s =>
            {
            });
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
 

Update 2:

When I look at sns subscriptions I see that the first which was created and subscribed manually through aws cli has a correct Endpoint, while the second that was created by MassTransit library has incorrect one. How to configure Endpoint for the SQS queue?

$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
    "Subscriptions": [
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "http://localhost:4566/000000000000/deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        },
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        }

Update 3:

I've cloned the project and ran some unit tests of the project for AmazonSQS bus configuration, consumers don't seem to work. enter image description here

When I list subscriptions after the test run I can tell that Endpoints are incorrect.

...
{
    "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
    "Owner": "",
    "Protocol": "sqs",
    "Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
    "TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...

Could it be that AmazonSQS for localstack has a major bug?

It's not clear how to use library with 'localstack' sqs, how to point out to actual endpoint (QueueUrl) of an SQS queue.


Solution

  • Looks like your configuration is setup properly to publish, but there are probably at least a few reasons I can think of why you are not receiving messages:

    1. Issue with the current version of localstack. I had to use 0.11.2 - see Localstack with MassTransit not getting messages
    2. You are publishing to a different topic. Masstransit will create the topic using the name of the message type. This may not match the topic you configured on the receive endpoint. You can change the topic name by configuring the topology - see How can I configure the topic name when using MassTransit SQS?
    3. Your consumer is not configured on the receive endpoint - see the example below
    public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
    public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
    
    ...
    services.AddMassTransit(x =>
    {
        x.UsingAmazonSqs((context, cfg) =>
        {
            cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
            {
                h.Config(AmazonSQSConfig);
                h.Config(AmazonSnsConfig);
            });
    
            cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
            {
                e.Subscribe("deal-topic", s => {});
                e.Consumer<MessageConsumer>();
            });
        });
    });
      
    services.AddMassTransitHostedService(waitUntilStarted: true);
    services.AddHostedService<Worker>();
    
    

    From what I see in the docs about Consumers you should be able to add your consumer to the AddMastTransit configuration like your original sample, but it didn't work for me.