Search code examples
pythonasp.net-corerabbitmqmicroservices

Python publish to RabbitMQ exchange/queue consumed by ASP.NET Core Service


I'm running RabbitMQ, in a Docker container (rabbitmq:3-management image) as part of a Docker Compose application. The application contains some ASP.NET Core WebApi microservices, which exchange messages via this broker. That works fine and didn't give me any problems so far.

Now I need to publish messages from a Python application to an exchange/queue which was created from one of the ASP.NET Core microservices. The microservice contains a consumer for this queue. For publishing from python, I'm using pika. The problem is, I can't seem to get the publishing right. Whenever I execute my Python script, I can see in the RabbitMQ management UI that a new exchange and queue with the suffix "_skipped" were created. It seems as if my message was sent there instead of the actual queue. Also, when trying to publish directly from the management UI, the message actually makes it to the microservice, but there I'll get an exception, that the message could not be deserialized to a MassTransit envelope object, and also a new exchange and queue with the "_error" suffix.

I have no idea where the problem is. I think the exchange/queue themselves are fine, since other queues/consumers/publishers for microservice to microservice communication in this project work. So then it's probably either how I'm trying to address the exchange/queue from Python, or something with my message body which is not right.

This page gives some info about how messages need to be structured, but not too detailed, and here I got most of the info about how to publish with Python.

Below you see the relevant code regarding the host/queue configuration in the microservice, as well as the Python script. Any help/tips on how I can get this to work would be greatly appreciated.

ASP.NET Core:

// Declaring the host, queue "mappingQueue", consumer in Startup.ConfigureServices of microservice
...
    services.AddMassTransit(x =>
    {
        x.AddConsumer<MappingUpdateConsumer>();
        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config => 
        { 
            config.Host(new Uri(RabbitMqConst.RabbitMqRootUri), h =>
            {
                h.Username(RabbitMqConst.RabbitMqUsername);
                h.Password(RabbitMqConst.RabbitMqPassword);
            });         
            config.ReceiveEndpoint("mappingQueue", e =>
            {
                e.ConfigureConsumer<MappingUpdateConsumer>(provider);                        
            });
        }));
    });
    services.AddMassTransitHostedService();
...
// Consumer
public class MappingUpdateConsumer : IConsumer<MappingUpdateMessage>
{
    ...
    public async Task Consume(ConsumeContext<MappingUpdateMessage> context)
    {
        await Task.Run(async () =>
        {
            if (context.Message == null)
            {
                return;
            }
            ...
        });
    }
}
// Message class (will have more properties in the future, thus not just using a string consumer)
public class MappingUpdateMessage
{
    public string Message { get; set; }
}

Python:

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='mappingQueue', exchange_type='fanout', durable=True)

message = {
  "message"    : {
    "message": "Hello World"
  },
  "messageType": [
    "urn:message:MassTransit.Tests:ValueMessage"
  ]
}

channel.basic_publish(exchange='mappingQueue',
                      routing_key='mappingQueue',
                      body=json.dumps(message))

connection.close()
print("sent")

Solution

  • For those with the same problem, I figured it out eventually:

    ..
    config.ReceiveEndpoint("mappingQueue", e =>
    {
        e.ClearMessageDeserializers();
        e.UseRawJsonSerializer();
        e.ConfigureConsumer<MappingUpdateConsumer>(provider);
    });
    ...