Search code examples
rebus

Rebus Windsor container and inlinemessage handlers


I am attempting to perform request / reply with Rebus but also using the Windsor container NuGet package: https://github.com/rebus-org/Rebus.CastleWindsor

After looking at the following samples:

https://github.com/rebus-org/RebusSamples/tree/master/RequestReply

https://github.com/rebus-org/RebusSamples/tree/master/Integration

I have cobbled this example below together:

public class ContactModule : NancyModule
{
    public ContactModule(IBus bus)
    {
        Get["/v1/contact"] = parameters =>
        {
            var contacts = new List<Contact>();

            using (var activator = new BuiltinHandlerActivator())
            {
                var settings = new Settings();

                activator.Handle<GetContactsResponse>(response =>
                {
                    contacts = response.Contacts;

                    return Task.FromResult(0);
                });

                Configure.With(activator)
                    .Logging(l => l.ColoredConsole(LogLevel.Warn))
                    .Transport(t => t.UseRabbitMq(
                        settings.RabbitMQConnectionString,
                        settings.OutputQueueName)) // we listen for messages in the output queue
                    .Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) // but we publish to the input queue
                    .Options(o =>
                    {
                        o.EnableCompression();
                        o.EnableEncryption(settings.MessageEncryptionKey);
                    })
                    .Start();

                activator.Bus.Publish(new GetContactsRequest``()).Wait();
            }

            return Response.AsJson(contacts);
        };
    }
}

I know it should probably look more like the below when using the Windsor container approach, but I do not know how to listen for the reply:

public class ContactModule : NancyModule
{
    public ContactModule(IBus bus)
    {
        Get["/v1/contact"] = parameters =>
        {
            var contacts = new List<Contact>();

            bus.Publish(new GetContactsRequest()).Wait();

            // How do I listen for the reply?

            return Response.AsJson(contacts);
        };
    }
}

My message handler:

public class GetContactsHandler : IHandleMessages<GetContactsRequest>
{
    private readonly IBus _bus;
    private readonly Settings _settings;

    public GetContactsHandler(IBus bus, Settings settings)
    {
        _bus = bus;
        _settings = settings;
    }

    public async Task Handle(GetContactsRequest request)
    {
        // Fetch contacts from db...
        var contacts = new List<Contact>();
        await _bus.Reply(new GetContactsResponse {Contacts = contacts});
    }
} 

My Rebus Windsor installer:

public class RebusInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        var settings = new Settings();

        Configure.With(new CastleWindsorContainerAdapter(container))
            .Logging(l => l.ColoredConsole(LogLevel.Warn))
            .Transport(t => t.UseRabbitMqAsOneWayClient(
                settings.RabbitMQConnectionString))
            .Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) 
            .Options(o =>
            {
                o.EnableCompression();
                o.EnableEncryption(settings.MessageEncryptionKey);
            })
            .Start();
    }
}

The problem I am facing is that I want to use the request / reply pattern in my web api to request a list of contacts, wait for the reply containing the retrieved contacts and return them to the api caller.

But If I want to use the Windsor container adapter for Rebus the IHandlerActivator interface does not expose the .Handle method which allows for registering inline message handlers, in which I am grabbing the contacts from the reply and then sending them back to the api caller.

Is there a way to do this, or am I approaching the problem incorrectly?

Edit: As you can see in the first example I am injecting in the IBus interface from the Windsor container. But if I use the injected bus how do I tell it to listen for the reply comming back from the message handler?

Update: Rebus.Async was exactly what I was looking for: https://github.com/rebus-org/Rebus.Async


Solution

  • When you are using a real IoC container, you should create a class that implements IHandleMessages<TMessage>, e.g. like this:

    public class GetContactsRequestHandler : IHandleMessages<GetContactsRequest>
    {
        readonly IBus _bus;
    
        public GetContactsRequestHandler(IBus bus)
        {
            _bus = bus;
        }   
    
        public async Task Handle(GetContactsRequest request)
        {
            var reply = new GetContactsReply(...); 
    
            await _bus.Reply(reply);
        }
    }
    

    which scales much better as you add handlers and assisting application and domain services to your program.

    This of course means that you must bring in the Rebus.CastleWindsor NuGet package and pass a WindsorContainerAdapter to Rebus' .With(...) method.

    When you need to register handlers in Windsor, you can take advantage of the extension methods from Rebus.CastleWindsor which allows for doing these things:

    // register all handler types from this assembly
    windsorContainer.AutoRegisterHandlersFromThisAssembly();
    
    // register all handler types from typeof(SomeHandler).Assembly
    windsorContainer.AutoRegisterHandlersFromAssemblyOf<SomeHandler>();
    
    // register one particular type as a message handler
    windsorContainer.RegisterHandler<GetContactsRequestHandler>();
    

    UPDATE after you added some more info – you said:

    (...) or am I approaching the problem incorrectly?

    and I would say "yes", because a service bus is not very good at synchronous request/reply – you should use something "request/reply-oriented" for that, e.g. HTTP(*).

    Also – I know almost nothing about Nancy, but I am pretty sure that

    bus.Publish(...).Wait();
    

    will deadlock, at least if you are hosting Nancy in ASP.NET.

    You should always

    await bus.Publish(...);
    

    when you can, and in Nancy you can just do this:

    Get["/v1/contact"] = async (parameters, ct) =>
    {
        (...)
    
        await bus.Publish(...);
    
        (...)
    };
    

    PS: Last thing: Remember to keep the container around as long as you want your endpoint to process messages.

    In the code you posted, you dispose the BuiltinHandlerActivator right after publishing the event, which means that you will most likely not get to process any messages at all.

    If you use Windsor, the bus will be disposed when you dispose the Windsor container.


    (*) Although Rebus does actually have Rebus.Async which is a request/response API that could be used in your scenario.

    I would not recommend going down this road though. You should probably only do this if you intend to do it this once, and then never again :)