There are official examples of MassTransit with SQS. The "bus" is configured to use SQS (x.UsingAmazonSqs). The receive endpoint is an SQS which in turn subscribed to an SNS topic. However there is no example how to Publish into SNS.
AWS sdk version:
var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };
Update:
After Chris's reference and experiments with configuration I came up with the following for the 'localstack' SQS/SNS. This configuration executes without errors and Worker gets called, and publishes a message to a bus. However consumer class is not triggered and doesn't seem messages end up in the queue (or rather topic).
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
h.EnableScopedTopics();
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s =>
{
});
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
Update 2:
When I look at sns subscriptions I see that the first which was created and subscribed manually through aws cli has a correct Endpoint, while the second that was created by MassTransit library has incorrect one. How to configure Endpoint for the SQS queue?
$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
"Subscriptions": [
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "http://localhost:4566/000000000000/deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
},
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
}
Update 3:
I've cloned the project and ran some unit tests of the project for AmazonSQS bus configuration, consumers don't seem to work.
When I list subscriptions after the test run I can tell that Endpoints are incorrect.
...
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...
Could it be that AmazonSQS for localstack has a major bug?
It's not clear how to use library with 'localstack' sqs, how to point out to actual endpoint (QueueUrl) of an SQS queue.
Looks like your configuration is setup properly to publish, but there are probably at least a few reasons I can think of why you are not receiving messages:
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s => {});
e.Consumer<MessageConsumer>();
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
From what I see in the docs about Consumers you should be able to add your consumer to the AddMastTransit
configuration like your original sample, but it didn't work for me.