Search code examples
c#.net-corecommandevent-sourcingbrighter

Brighter Consumer Messages Not Routed to Handler


I'm trying to use Brighter for command/event sourcing. I have a solution containing a .NET Core Web Api service for putting messages into a queue and another solution containing a .NET Core console project for getting messages out of the queue. The services are isolated and are not in the same solution.

The messages dispatcher does pull the messages out of Rabbit and routes it to the MessageMapper, but the message does not find its way to the handler to be processed.

Visual Studio 2015, .NET Core 1.1, Paramore.Brighter.MessagingGateway.RMQ 7.1.5, Paramore.Brighter.ServiceActivator 7.1.5, StructureMap.Microsoft.DependencyInjection 1.4.0.

The configuration in the console app:

public static void Main(string[] args)
{
    RetryPolicy retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new List<TimeSpan>()
    {
        TimeSpan.FromMilliseconds(50),
        TimeSpan.FromMilliseconds(100),
        TimeSpan.FromMilliseconds(150)
    });

    CircuitBreakerPolicy circuitBreakerPolicy = Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500));
    PolicyRegistry policyRegistry = new PolicyRegistry() { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } };

    var subscriberRegistry = new SubscriberRegistry();
    subscriberRegistry.Register<ApplicationUpdateCommand, ApplicationUpdateCommandHandler>();

    var rmqConnnection = new RmqMessagingGatewayConnection
    {
        AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
        Exchange = new Exchange("api.coverage.exchange"),
    };

    var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnnection);
    var rmqMessageProducerFactory = new RmqMessageProducerFactory(rmqConnnection);

    Dispatcher dispatcher = null;
    var container = new Container();
    container.Configure(config =>
    {
        config.For<IHandleRequests<ApplicationUpdateCommand>>().Use<ApplicationUpdateCommandHandler>();
        var servicesMessageMapperFactory = new ServicesMessageMapperFactory(container);
        var messageMapperRegistry = new MessageMapperRegistry(servicesMessageMapperFactory)
        {
            {typeof(ApplicationUpdateCommand), typeof(ApplicationUpdateCommandMessageMapper) }
        };

        var servicesHandlerFactory = new ServicesHandlerFactory(container);

        var commandProcessor = CommandProcessorBuilder.With()
            .Handlers(new HandlerConfiguration(subscriberRegistry, servicesHandlerFactory))
            .Policies(policyRegistry)
            .NoTaskQueues()
            .RequestContextFactory(new InMemoryRequestContextFactory())
            .Build();

            dispatcher = DispatchBuilder.With()
                                        .CommandProcessor(commandProcessor)
                                       .MessageMappers(messageMapperRegistry)
                                        .DefaultChannelFactory(new InputChannelFactory(rmqMessageConsumerFactory, rmqMessageProducerFactory))
                                        .Connections(new List<Connection>()
                                        {
                                            new Connection<ApplicationUpdateCommand>
                                            (
                                                new ConnectionName("Application.Update"), 
                                                new ChannelName("Application.Update"), 
                                                new RoutingKey("Application.Update")
                                            )
                                        }).Build();
        });

        dispatcher.Receive();

        Console.WriteLine("Press enter to stop ...");
        Console.ReadLine();

        dispatcher.End().Wait();
    }

Code for the MessageMapper, Command, and Handler:

public class ApplicationUpdateCommandMessageMapper : IAmAMessageMapper<ApplicationUpdateCommand>
{
    public Message MapToMessage(ApplicationUpdateCommand request)
    {
        var header = new MessageHeader(messageId: request.Id, topic: "Application.Update", messageType: MessageType.MT_EVENT);
        var body = new MessageBody(JsonConvert.SerializeObject(request));
        var message = new Message(header, body);
        return message;
    }

    public ApplicationUpdateCommand MapToRequest(Message message)
    {
        // dispatcher will route message here but that is it
        ApplicationUpdateCommand command = JsonConvert.DeserializeObject<ApplicationUpdateCommand>(message.Body.Value);
        return command;
    }
}

public class ApplicationUpdateCommand : Command
{
    public int ApplicationId { get; private set; }
    public string ApplicantName { get; private set; }

    public ApplicationUpdateCommand(Guid id, int applicationId, string applicantName)
        : base(id)
    {
        ApplicationId = applicationId;
        ApplicantName = applicantName;
    }
}

public class ApplicationUpdateCommandHandler : RequestHandler<ApplicationUpdateCommand>
{
    private readonly IAmACommandProcessor _commandProcessor;

    public ApplicationUpdateCommandHandler(IAmACommandProcessor commandProcessor)
    {
        _commandProcessor = commandProcessor;
    }

    public override ApplicationUpdateCommand Handle(ApplicationUpdateCommand command)
    {
        // would like to get here to handle command

        return base.Handle(command);
    }
}

Solution

  • You identify as MessageType.MT_EVENt in the header but are deriving from Command. The two should agree, either derive from Event or use MT_COMMAND