Search code examples
python.netrabbitmqmasstransitpika

Publish a message to the right queue of RabbitMQ from Python (Pika) for consumption in .NET (MassTransit)


I want to publish a message from a Python-based microservice (using Pika) to a queue of RabbitMQ consumed by a .NET-based microservice (using MassTransit as event bus).

A sample code of the (Python-based) publisher is as follows:

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@rabbitmq:5672/%2F'))

channel = connection.channel()
channel.queue_declare(queue='queue_net', durable=True)

channel.basic_publish(exchange='', routing_key='queue_net', body='message',
properties=pika.BasicProperties(
    content_type = 'text/plain',
    delivery_mode = 2 # make messages persistent
))

Instead of being sent to the queue_net queue, the message goes to a queue_net_error queue (from the RabbitMQ Management screen).

For completeness, a sample code of the (.NET-based) consumer is as follows:

In Startup, ConfigureServices:

services.AddMassTransit(config =>
{
    config.AddConsumer<SampleConsumer>();
    config.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("amqp://rabbitmq:5672", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        cfg.ReceiveEndpoint("queue_net", ep =>
        {
            ep.ConfigureConsumer<SampleConsumer>(ctx);
        });
    });
});

where SampleConsumer is defined as:

public class SampleConsumer : IConsumer<string>
{
    public Task Consume(ConsumeContext<string> context)
    {
        Console.WriteLine(context);
        return Task.CompletedTask;
    }
}

What am I doing wrong? I am able to do the opposite, i.e. publish from .NET and consume in Python.

Many thanks.


Solution

  • You can either use a supported message format and content type so that the message can be consumed by MassTransit, or you can use the raw JSON message deserializer in MassTransit to consume messages from other languages that send raw JSON.

    Messages end up in the _error queue when the message is unable to be processed, which could be a serialization error or an exception thrown by a consumer. Messages end up in the _skipped queue when they are able to be deserialized but no consumer actually consumed the message (due to a mismatched message type, etc.).