Search code examples
c#azureazureservicebusazure-servicebus-topics

Azure Service Bus Consumer Cannot Find Topic while Successful Publishing to the same topic


Description: I am working on an Azure Service Bus implementation where I can successfully publish messages to a topic, but my consumer is unable to find the topic and throws a MessagingEntityNotFound exception. Below are the details of my setup and the error message.

I have the following configuration in my appsettings.Development.json:

  "EventSettings": {
    "Events": {
      "TenantEvents": {
        "ConnectionString": "Endpoint=sb://<servicebusname>.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=<key>",
        "TopicName": "tenantevents"
      },
      "AcknowledgmentEvents": {
        "ConnectionString": "Endpoint=sb://<servicebusname>.servicebus.windows.net/;SharedAccessKeyName=send-and-recieve;SharedAccessKey=<key>",
        "TopicName": "acknowledgmentevents"
      }
    }
  }

Consumer Code: Here is the code for consuming events:

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Starting EventAcknowledgmentConsumer...");

            var eventConfig = _eventSettings.Events["AcknowledgmentEvents"];
            _client = new ServiceBusClient(eventConfig.ConnectionString);
            _receiver = _client.CreateReceiver(eventConfig.TopicName);

            _logger.LogInformation("EventAcknowledgmentConsumer configured with Topic: {TopicName}", eventConfig.TopicName);

            // Start receiving messages
            await ReceiveMessagesAsync(cancellationToken);

            _logger.LogInformation("EventAcknowledgmentConsumer started.");
        }

Publisher Code: Here is the code for publishing events: (publishing events are working fine):

app.MapPost("/events/publish", async (IEventPublisher eventPublisher, EventDto eventDto) =>
{
    var eventAcknowledgment = new EventAcknowledgment(eventDto.EventId, eventDto.ConsumerId, Enum.Parse<EventAcknowledgment.AcknowledgmentStatus>(eventDto.Status))
    {
        Timestamp = eventDto.Timestamp
    };
    await eventPublisher.PublishAsync(eventAcknowledgment);
    return Results.Ok(new { Message = "Event published successfully", EventId = eventAcknowledgment.EventId });
});


        public async Task PublishAsync<TEvent>(TEvent @event)
        {
            var eventConfig = GetEventConfig(@event);

            await using var client = new ServiceBusClient(eventConfig.ConnectionString);
            var sender = client.CreateSender(eventConfig.TopicName);

            var message = new ServiceBusMessage(JsonSerializer.Serialize(@event))
            {
                ContentType = "application/json",
                SessionId = Guid.NewGuid().ToString() // Set a unique SessionId
            };

            await sender.SendMessageAsync(message);
        }

        private EventConfig GetEventConfig<TEvent>(TEvent @event)
        {
            return @event switch
            {
                TenantCreatedEvent => _eventSettings.Events["TenantEvents"],
                TenantUpdatedEvent => _eventSettings.Events["TenantEvents"],
                TenantDeletedEvent => _eventSettings.Events["TenantEvents"],
                EventAcknowledgment => _eventSettings.Events["AcknowledgmentEvents"],
                // Add other event types and their corresponding configurations here
                _ => throw new ArgumentOutOfRangeException(nameof(@event), @event, null)
            };
        }
    }

Error Message: Here is the error message I am receiving:

2025-03-03 06:34:54       Azure.Messaging.ServiceBus.ServiceBusException: The messaging entity '<servicebusname>:topic:acknowledgmentevents~15|amqps://<servicebusname>.servicebus.windows.net/-a46a10d0;0:5:6:source(address:/acknowledgmentevents,filter:[])' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions.  TrackingId:bcdbc70c-95ea-47b7-aabd-e6aa3db58423_B6, SystemTracker:gi::G7:Recv:1646104:638765501148700000:<servicebusname>:topic:acknowledgmentevents~15:F0:C11, bi::in-connection2278(G7-1156480)::session2286::link10087117, Timestamp:2025-03-03T05:34:54 TrackingId:fb67cf36800149079d2f511a4d83f734_G7, SystemTracker:gateway10, Timestamp:2025-03-03T05:34:54 (MessagingEntityNotFound). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot.
2025-03-03 06:34:54          at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.ReceiveMessagesAsyncInternal(Int32 maxMessages, Nullable`1 maxWaitTime, TimeSpan timeout, CancellationToken cancellationToken)
2025-03-03 06:34:54          at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<ReceiveMessagesAsync>b__44_0>d.MoveNext()

I found a comment suggesting that having an endpoint name that is a case-insensitive duplicate of a topic name can cause issues. However, I have verified that my endpoint name and topic name are not case-insensitive duplicates.

What could be causing the consumer to fail to find the topic despite successful message publishing? Are there any additional steps I should take to troubleshoot and resolve this issue?


Solution

  • The topic itself will not hold any messages, but the subscriptions will. So if you want to read messages from a Topic, you need to connect to one of its subscriptions. Microsoft Docs

    The method "CreateReceiver" can listen to both queues and topic subscriptions. If you specify only 1 parameter, it will try to find a queue. If you specify 2 parameters, it will look for a topic subscription.

    So the error "MessagingEntityNotFound" is most likely thrown because the ServiceBusClient is looking for a queue named "acknowledgmentevents", rather than a topic.

    So to fix your issue, you need to listen to a topic subscription, something like this:

    ServiceBusClient client = new ServiceBusClient("ConnectionString");
    var receiver = client.CreateReceiver("mytopic", "mysubscription");
    var result = receiver.ReceiveMessageAsync();