Search code examples
rebus

How implement Rebus subscription / publication between two microservices with copies of DTO located in different assemblies?


I am using Rebus for the following generic situation, natural for WCF or WebAPI. There are two micro service, having identical set of DTO in their assemblies, for an exchange integration events. DTO classes are similar, but they reside in different assemblies with different .Net Core and .Net Framework frameworks. To make a DTO in a separate shared Assembly I would not like. How can you implement subscription / publication to exchange integration events between these two microservices based on two copies of DTO located in different assemblies?

Example of first microservice:

public class Message
{
    public int MessageNumber { get; set; }
}

class Program
{
    static void Main()
    {
        using (var adapter = new BuiltinHandlerActivator())
        {
            Configure.With(adapter)
                .Logging(l => l.ColoredConsole(LogLevel.Warn))
                .Transport(t => t.UseRabbitMqAsOneWayClient("amqp://myRebbitMQ"))
                .Start();

            var keepRunning = true;
            while (keepRunning)
            {
                var key = char.ToLower(Console.ReadKey(true).KeyChar);
                switch (key)
                {
                    case 'f':
                        Console.WriteLine("Publishing {0} messages", 1000);
                        var jobs = Enumerable.Range(0, 1000)
                            .Select(i => new Message { MessageNumber = i })
                            .Select(m => adapter.Bus.Publish(m))
                            .ToArray();
                        Task.WaitAll(jobs);
                        break;
                    case 'q':
                        Console.WriteLine("Quitting");
                        keepRunning = false;
                        break;
                }
            }
        }
    }
}

Example of second microservice:

public class Message
{
    public int MessageNumber { get; set; }
}

public class MessageHandler : IHandleMessages<Message>
{
    public async Task Handle(Message message)
    {
        Console.WriteLine("Processing message {0}", message.MessageNumber);

        await Task.Delay(200);
    }
}

class Program
{
    private static IContainer _container;
    static void Main()
    {
        var builder = new ContainerBuilder();
        builder.RegisterType<MessageHandler>().As(typeof(IHandleMessages<>).MakeGenericType(typeof(Message)));
        builder.RegisterRebus((configurer, context) => configurer
            .Logging(l => l.ColoredConsole(LogLevel.Warn))
            .Transport(t => t.UseRabbitMq("amqp://myRebbitMQ", "consumer2"))
            //.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects })) ???
            .Options(o => o.SetMaxParallelism(5))
        );
        using (_container = builder.Build())
        {
            var eventBus = _container.Resolve<IBus>();
            eventBus.Subscribe<Message>().Wait();
            Console.WriteLine("Consumer listening - press ENTER to quit");
            Console.ReadLine();
        }
    }
}

Solution

  • While Rebus has prominent APIs for type-based pub/sub, the underlying implementation works with ordinary string-based topics.

    This means that e.g.

    await bus.Subscribe<YourMessage>();
    

    and

    await bus.Publish(new YourMessage());
    

    are simply translated into subscribe and publish operations on the topic "YourMessages.YourMessage, YourMessages" (here pretending that YourMessage is located in the YourMessages assembly.

    You can access the raw topic-based API by using the topics API, which you can get here:

    var topics = bus.Advanced.Topics;
    

    and then you can

    await topics.Subscribe("to anything");
    

    and

    await topics.Publish("to anything", new YourMessage());
    

    So, to fix your issue, I suggest you take advantage of this and decide on a common topic for each event type.