Search code examples
amazon-sqsmasstransit

How can I configure the topic name when using MassTransit SQS?


I use MassTransit SQS. I found out, when publishing a message, this problem occurs:

User is not authorized to perform: SNS:CreateTopic on resource

So I need rights to create topics.

I solved it by asking the admin to extend rights. This works and topics can be created. However, it seems that the topic names are automatically generated based on the class names. Can I modify this behaviour programmatically (so by giving a self described topic name).

This can be useful for creating a FIFO topic (which needs to have a name ending with .fifo).

Update

I am now familiar with formatting (thanks for telling me Chris) with SetEntityName.

But my problem seems to be really fifo specific.

This works:

                cfg.Message<CustomerUpdate>(x =>
                {
                    x.SetEntityName("customerupdate");
                });

but this does not work:

                cfg.Message<CustomerUpdate>(x =>
                {
                    x.SetEntityName("customerupdate.fifo");
                });

It results in an exception with is error message:

Invalid parameter: Topic Name

of this type:

Amazon.SimpleNotificationService.Model.InvalidParameterException

with this stacktrace:

Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.<HandleExceptionAsync>d__2.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ExceptionHandler`1.<HandleAsync>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.ErrorHandler.<ProcessExceptionAsync>d__8.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CredentialsRetriever.<InvokeAsync>d__7`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorCallbackHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.MetricsHandler.<InvokeAsync>d__1`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.TopicCache.<CreateMissingTopic>d__9.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.TopicCache.<>c__DisplayClass6_0.<<Get>b__0>d.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Caching.Internals.PendingValue`2.<CreateValue>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<Declare>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<ConfigureTopology>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>b__0>d.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.PipeExtensions.<OneTimeSetup>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>d__3.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.TaskAwaiter.GetResult()<PublishAsync>d__2.MoveNext() in Publisher.cs: line: 18

Here is my code

    public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration)
    {
        services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomerChangeConsumer>();
            x.UsingAmazonSqs((context, cfg) =>
            {
                cfg.Host(massTransitConfiguration.Host, h =>
                {
                    h.AccessKey(massTransitConfiguration.AccessKey);
                    h.SecretKey(massTransitConfiguration.SecretKey);

                    h.EnableScopedTopics();
                });

                cfg.ReceiveEndpoint("CustomerChangeConsumer",
                    configurator =>
                    {
                        configurator.ConfigureConsumer<CustomerChangeConsumer>(context);
                    });

                cfg.Message<CustomerUpdate>(x =>
                {
                    // This causes the exception when publishing:
                    x.SetEntityName("customerupdate.fifo");
                    // But this works. No issues when publishing
                    //x.SetEntityName("customerupdate");
                });
            });
        });
        
        services.AddMassTransitHostedService();
    }
}

Solution

  • You can configure the message topology to specify the entity name formatting.

    For instance, to change the topic name for the OrderSubmitted event:

    Bus.Factory.CreateUsingAmazonSqs(cfg =>
    {
        cfg.Message<OrderSubmitted>(x =>
        {
            x.SetEntityName("order-submitted.fifo");
        });
    
        cfg.Publish<OrderSubmitted>(x =>
        {
            x.TopicAttributes["FifoTopic"] = "true";
        });
    });
    

    To set the MessageGroupId on publish, use:

    await bus.Publish<OrderSubmitted>(message, x => x.SetGroupId("..."));
    

    I'm not 100% sure that survives Publish, vs. Send.