Search code examples
c#windows-servicesservicebusmasstransit

MassTransit Respond going to same queue and causing loop


We're currently switching out NServiceBus for MassTransit and I'm having a little difficulty with the request/response pattern.

In NServiceBus, I'm able to do reply in the Handler and it goes back to the client that sent it.

In MassTransit, it appears as though the response is being sent back to the queue that it was consumed from, thus creating a loop...

Weird thing, if I'm creating the Bus using InMemory, and both client and consumer on the same machine, I do not have the issue.

I am expecting my client to catch the response, but instead my Consumer picks it up, which is also odd, since it's not setup to receive that message type...

Am I missing something in the client's Request setup?

Client:

....
IRequestClient<IWorklistRequest, IWorklistResponse> client = CreateRequestClient(busControl, WorklistEndpointUri);

Console.Write("Sending Request");

Task.Run(async () =>
{
    IWorklistRequest request = new WorklistRequest
    {
        CurrentDateFrom = new DateTime(2016, 11, 07)
    };

    var response = await client.Request(request);

    Console.WriteLine("Worklist Items retrieved: {0}", response.ExamItemList.Length);

}).Wait();
....



static IRequestClient<IWorklistRequest, IWorklistResponse> CreateRequestClient(IBusControl busControl, string endpointAddress)
{
    Console.WriteLine("Creating Request client...");

    var serviceAddress = new Uri(endpointAddress);
    IRequestClient<IWorklistRequest, IWorklistResponse> client =
    busControl.CreateRequestClient<IWorklistRequest, IWorklistResponse>(serviceAddress, TimeSpan.FromSeconds(10));

    return client;
}

Consumer:

    public Task Consume(ConsumeContext<IWorklistRequest> context)
    {
        _log.InfoFormat("Received Worklist Request with Id: {0}", context.RequestId);


        try
        {
            var result = _provider.GetAllWorklistsByStartDate(context.Message.CurrentDateFrom);

            IWorklistResponse response = new WorklistResponse
            {
                ExamItemList = result.ToArray()
            };

            // the below is sending the response right back to the original queue and is getting picked up again by this same consumer
            context.Respond(response);
        }
        catch (Exception ex)
        {
            _log.Info(ex.Message);
        }

        return Task.FromResult(0);
    }

Solution

  • If you are using RabbitMQ, and you are using the request client, you should not see this behavior.

    There is a sample that demonstrates how to use the request client on the MassTransit GitHub repository: https://github.com/MassTransit/Sample-RequestResponse

    The code above appears to be correct, and the Respond() call should use the response address from the request message, which is a direct endpoint send to the temporary bus address.

    There is pretty extensive unit test coverage around this area, and the sample above was updated and verified with the latest version of MassTransit. You might consider deleting/recreating your RabbitMQ virtual host and running your application from scratch (start the response service first, so that the endpoints are setup).