Search code examples
msmqmasstransit

MassTransit pub/sub with MSMQ using


I am just starting with MassTransit and could not find any decent documentation for beginners. I did find some sample code at http://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/ which shows a A Simple MassTransit Publish/Subscribe Example using RabbitMQ

But for my company I need to be using MSMQ. So I removed the RabbitMQ references:

x.UseRabbitMq();
x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName);

and changed them to use MSMQ instead:

x.UseMsmq();
x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);

I get no errors starting both subscriber and publisher, I can enter messages at the publisher but they do not seem to arrive at the subscriber, the consume-code is never called.

Configuration :

namespace Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        x.UseMsmq();
        x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);
        moreInitialization(x);
      });

      return bus;
    }
  }
}

Publisher:

static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage() { What = text, When = DateTime.Now };
        bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
      }

      bus.Dispose();
    }

Subscriber:

static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestSubscriber", x =>
      {
        x.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }

Consumer code which is not being called:

class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.Write("TXT: " + message.Message.What);
      Console.Write("  SENT: " + message.Message.When.ToString());
      Console.Write("  PROCESSED: " + DateTime.Now.ToString());
      Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");
    }
  }

I also thought I could see the messages being stored in msmq, but the private queues are empty.

I am banging my head now for 2 days and must be missing something obvious ; any help is greatly appreciated.

My environment: Windows 8.1 Prof. with VS 2013 Prof.


Solution

  • The missing piece of the puzzle is the subscription distribution - this is the component that tracks what consumer is registered to what message, and routes the messages.

    As the docs say:

    Once a subscription is created on a local bus, this information then needs to be shared between all the different bus instances in your application network.

    Though the routing data is the same, how this information get to all of the nodes is different depending on your transport configuration.

    As the page goes on to explain, for MSMQ you have two options:

    1. MSMQ Multicast
    2. UseSubscriptionService (MassTransit.RuntimeServices)

    Multicast is only really meant for development and not for production. Note there are no permanent subscriptions and messages can be lost during startup

    The subscription service (aka MassTransit.RuntimeServices) is not distributed with NuGet so you'll need to find a binary or compile from source. It runs as a Windows service, and requires a database to track subscriptions.

    I believe none of this is required when running RabbitMQ or Azure service bus, which are the transports supported in the upcoming version 3.0 (Rabbit MQ is more supported in recent versions of 2.x than MSMQ, and version 3 drops MSMQ support)