Search code examples
nservicebusrequest-responsenservicebus7

NServiceBus 7 - How to send multiple consecutive requests?


I am trying to make multiple consecutive requests using NServiceBus and RabbitMQ. Here is my code

class Program
    {
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }
        static async Task MainAsync()
        {
            ConfigureLog();

            var ec = new EndpointConfiguration("NServiceBus.RequestResponse");
            ec.UsePersistence<InMemoryPersistence>();
            ec.EnableInstallers();
            ec.EnableCallbacks();
            ec.MakeInstanceUniquelyAddressable(Guid.NewGuid().ToString());
            ec.LimitMessageProcessingConcurrencyTo(1);

            var transport = ec.UseTransport<RabbitMQTransport>();
            transport.UseConventionalRoutingTopology();
            transport.ConnectionString("host=localhost;username=guest;password=guest");
            transport.PrefetchCount(100);

            var endpoint = await Endpoint.Start(ec)
                .ConfigureAwait(false);

            try
            {
                var options = new SendOptions();
                options.SetDestination("NServiceBus.RequestResponse");

                Log($"Sending message: 1");
                var response1 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello World!!" }, options);
                Log($"Response 1 : {response1.Text}");

                Log($"Sending message: 2");
                var response2 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello from second request!!" }, options);
                Log($"Response 1 : {response2.Text}");

            }
            catch (Exception e)
            {
                Log("####  ERROR  ####");
                Log(e.Message);
                Log(e.StackTrace);
            }
            finally
            {
                await endpoint.Stop()
                    .ConfigureAwait(false);
            }

            Console.WriteLine("Press any key to exit");
            Console.ReadKey();
        }

        private static void Log(string msg)
        {
            LogManager.GetLogger("NServiceBus.RequestResponse").Debug(msg);
            Console.WriteLine(msg);
        }

        private static void ConfigureLog()
        {
            var layout = new PatternLayout
            {
                ConversionPattern = "%d [%t] %-5p %c [%x] - %m%n"
            };
            layout.ActivateOptions();
            var consoleAppender = new ConsoleAppender
            {
                Threshold = Level.All,
                Layout = layout
            };
            consoleAppender.ActivateOptions();

            var fileAppender = new FileAppender()
            {
                Threshold = Level.All,
                Layout = layout,
                File = "nsb_log.txt"
            };
            fileAppender.ActivateOptions();

            var executingAssembly = Assembly.GetExecutingAssembly();
            var repository = log4net.LogManager.GetRepository(executingAssembly);
            BasicConfigurator.Configure(repository, consoleAppender, fileAppender);

            LogManager.Use<Log4NetFactory>();
        }
    }

    public class MyMessage : IMessage
    {
        public string Text { get; set; }
    }

    public class MyMessageHandler : IHandleMessages<MyMessage>
    {
        public async Task Handle(MyMessage message, IMessageHandlerContext context)
        {
            await Console.Out.WriteLineAsync($"Message [{context.MessageId}] received: {message.Text} ");
            await context.Reply(new MyResponse() { Text = "Hi There!!" });
        }
    }

    public class MyResponse : IMessage
    {
        public string Text { get; set; }
    }

When I run the program, I get the following exception for the second request attempt:

2019-02-25 11:03:22,800 [9] DEBUG NServiceBus.RequestResponse [(null)] - Already specified reply routing option for this message: RouteReplyToThisInstance
2019-02-25 11:03:22,811 [9] DEBUG NServiceBus.RequestResponse [(null)] -    at NServiceBus.ApplyReplyToAddressBehavior.State.set_Option(RouteOption value)
   at NServiceBus.RoutingOptionExtensions.RouteReplyToThisInstance(SendOptions options)
   at NServiceBus.RequestResponseExtensions.<Request>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at NServiceBus.RequestResponse.Program.<MainAsync>d__1.MoveNext() in C:\Users\cruzedu\source\repos\NServiceBus.RequestResponse\NServiceBus.RequestResponse\Program.cs:line 47

I know I can change the code to call the Send method and have a handler for responses, but I still don't understand why it doesn't work with requests. What do I need to do to get it to work?

I've tried setting the message id, not passing the send options on the second call, etc. Nothing seems to work. I'm thinking this may be a bug.

I created a sample project to reproduce the behavior and shared it on GitHub. Here is the link. It is important to point out that it requires a RabbitMQ instance. In my case, I had it set up locally (http://localhost:15672) with the default Guest user/password.


Solution

  • You need to initialize a new copy of SendOptions every single time you send a message, as you cannot reuse the previous copy.

    var options = new SendOptions();
    options.SetDestination("NServiceBus.RequestResponse");
    var response1 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello World!!" }, options);
    
    options = new SendOptions();
    options.SetDestination("NServiceBus.RequestResponse");
    var response2 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello from second request!!" }, options);