Search code examples
azurenservicebusservicebusazure-queues

On premise NServicebus applicaton receiving messages from Azure ServiceBus queue


I am currently struggling to get something up and running on an nServiceBus hosted application. I have an azure ServiceBus queue that a 3rd party is posting messages to and I want my application (which is hosted locally at the moment) to receive these messages.

I have googled for answers on how to configure the endpoint but I have had no luck in a valid config. Has anyone ever done this as I can find examples of how to connect to Azure storage queues but NOT servicebus queue. (I need azure servicebus queues for other reasons)

The config I have is as below

public void Init()
    {
        Configure.With()
           .DefaultBuilder()
           .XmlSerializer()
           .UnicastBus()
           .AzureServiceBusMessageQueue()
           .IsTransactional(true)
           .MessageForwardingInCaseOfFault()
           .UseInMemoryTimeoutPersister()
           .InMemorySubscriptionStorage();
    }

. Message=Exception when starting endpoint, error has been logged. Reason: Input queue [mytimeoutmanager@sb://[*].servicebus.windows.net/] must be on the same machine as this Source=NServiceBus.Host

.

<configuration>
  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    <section name="AzureServiceBusQueueConfig" type="NServiceBus.Config.AzureServiceBusQueueConfig, NServiceBus.Azure" />
    <section name="AzureTimeoutPersisterConfig" type="NServiceBus.Timeout.Hosting.Azure.AzureTimeoutPersisterConfig, NServiceBus.Timeout.Hosting.Azure" />
  </configSections>
  <AzureServiceBusQueueConfig IssuerName="owner" QueueName="testqueue" IssuerKey="[KEY]" ServiceNamespace="[NS]" />
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <!-- Use the following line to explicitly set the Timeout manager address -->
  <UnicastBusConfig TimeoutManagerAddress="MyTimeoutManager" />
  <!-- Use the following line to explicity set the Timeout persisters connectionstring -->
  <AzureTimeoutPersisterConfig ConnectionString="UseDevelopmentStorage=true" />
  <startup useLegacyV2RuntimeActivationPolicy="true">
    <supportedruntime version="v4.0" />
    <requiredruntime version="v4.0.20506" />
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0" />
  </startup>
</configuration>

Solution

  • Try moving UnicastBus() to the end of your call, like this:

        Configure.With()
           .DefaultBuilder()
           .XmlSerializer()
           .AzureServiceBusMessageQueue()
           .IsTransactional(true)
           .MessageForwardingInCaseOfFault()
           .UseInMemoryTimeoutPersister()
           .InMemorySubscriptionStorage()
           .UnicastBus(); // <- Here
    

    And about those third parties posting messages to the queue. Keep in mind that they need to respect how NServiceBus handles serialization/deserialization. Here is how this is done in NServiceBus (the most important part is that the BrokeredMessage is initialized with a raw message, the result of a serialziation using the BinaryFormatter):

        private void Send(Byte[] rawMessage, QueueClient sender)
        {
            var numRetries = 0;
            var sent = false;
    
            while(!sent)
            {
                try
                {
                    var brokeredMessage = new BrokeredMessage(rawMessage);
    
                    sender.Send(brokeredMessage);
    
                    sent = true;
                }
                    // back off when we're being throttled
                catch (ServerBusyException)
                {
                    numRetries++;
    
                    if (numRetries >= MaxDeliveryCount) throw;
    
                    Thread.Sleep(TimeSpan.FromSeconds(numRetries * DefaultBackoffTimeInSeconds));
                }
            }
    
        }
    
        private static byte[] SerializeMessage(TransportMessage message)
        {
            if (message.Headers == null)
                message.Headers = new Dictionary<string, string>();
    
            if (!message.Headers.ContainsKey(Idforcorrelation))
                message.Headers.Add(Idforcorrelation, null);
    
            if (String.IsNullOrEmpty(message.Headers[Idforcorrelation]))
                message.Headers[Idforcorrelation] = message.IdForCorrelation;
    
            using (var stream = new MemoryStream())
            {
                var formatter = new BinaryFormatter();
                formatter.Serialize(stream, message);
                return stream.ToArray();
            }
        }
    

    If you want NServiceBus to correctly deserialize the message, make sure your thierd parties serialize it correctly.