Search code examples

Azure ServiceBus & async - To be, or not to be?

I'm running Service Bus on Azure, pumping about 10-100 messages per second.

Recently I've switched to .net 4.5 and all excited refactored all the code to have 'async' and 'await' at least twice in each line to make sure it's done 'properly' :)

Now I'm wondering whether it's actually for better or for worse. If you could have a look at the code snippets and let me know what your thoughts are. I especially worried if the thread context switching is not giving me more grief than benefit, from all the asynchrony... (looking at !dumpheap it's definitely a factor)

Just a bit of description - I will be posting 2 methods - one that does a while loop on a ConcurrentQueue, waiting for new messages and the other method that sends one message at a time. I'm also using the Transient Fault Handling block exactly as Dr. Azure prescribed.

Sending loop (started at the beginning, waiting for new messages):

private async void SendingLoop()
            await this.RecreateMessageFactory();

            Buffer<SendMessage> message = null;

            while (true)
                if (this.cancel.Token.IsCancellationRequested)
                if (this.cancel.Token.IsCancellationRequested)

                while (this.queue.TryDequeue(out message))
                        using (message)
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                await this.SendMessage(message, this.cancel.Token);
                    catch (OperationCanceledException)
                    catch (Exception ex)
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
            if (this.loopSemaphore != null)

Sending a message:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);

            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
            }, cancellationToken);
        catch (MessagingEntityNotFoundException)
            entityNotFound = true;                
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)

        if (entityNotFound)
            if (!cancellationToken.IsCancellationRequested)
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);

The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.

Btw do not worry about EnsureMessageSender, RecreateMessageFactory, EnsureTopicExists too much, they are not called that often.

Would I not be better of just having one background thread working through the message queue and sending messages synchronously, provided all I need is send one message at a time, not worry about the async stuff and avoid the overheads coming with it.

Note that usually it's a matter of milliseconds to send one Message to Azure Service Bus, it's not really expensive. (Except at times when it's slow, times out or there is a problem with Service Bus backend, it could be hanging for a while trying to send stuff).

Thanks and sorry for the long post,


Proposed Solution

Would this example be a solution to my situation?

static void Main(string[] args)
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
                while (true)
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                        Console.WriteLine("Skipping " + val);
                        val = next;

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
            catch (Exception ex)

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
            Console.WriteLine("Broadcasting " + i);



  • You say:

    The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.

    This is a good case for async. You save lots of threads here. Async reduces context switching because it is not thread-based. It does not context-switch in case of something requiring a wait. Instead, the next work item is being processed on the same thread (if there is one).

    For that reason you async solution will definitely scale better than a synchronous one. Whether it actually uses less CPU at 50-100 instances of your workflow needs to be measured. The more instances there are the higher the probability of async being faster becomes.

    Now, there is one problem with the implementation: You're using a ConcurrentQueue which is not async-ready. So you actually do use 50-100 threads even in your async version. They will either block (which you wanted to avoid) or busy-wait burning 100% CPU (which seems to be the case in your implementation!). You need to get rid of this problem and make the queuing async, too. Maybe a SemaphoreSlim is of help here as it can be waited on asynchronously.