Search code examples
c#rabbitmqqueuemasstransitconsumer

Masstransit can't connect to a queue in RabbitMQ from c# app


I have a queue in RabbitMQ. I can't configurate this queue, I have to consume messages from it. Publisher don't use Masstransit for publishing. I am using Masstransit to consume messages from the queue.

When I am trying to configurate connection to the queue, I am receiving this error:

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'my_vhost': received 'fanout' but current is 'direct'', classId=40, methodId=10

My configuration looks like:

Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("127.0.0.1", "my_virtual_host", credintials =>
    {
        credintials.Username("myuser");
        credintials.Password("mypassword");
    });

    cfg.ReceiveEndpoint("my_queue", e =>
    {

        e.UseRawJsonSerializer();
        e.Consumer(() => _messageConsumer);
    });
}).Start();

The queue has configuration Durable = true and that's it, nothing special.

When I am trying to connect to the queue via RabbitMQ.Client, it connects without problems. Consuming works well too.

How can I solve the problem?


Solution

  • I solve my problem. Connection to Masstransit looks like:

    private async Task InitMasstransitBusAsync(CancellationToken cancellationToken)
    {
        await Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri(_rabbitMqConfig.HostName), credintials =>
            {
                credintials.Username(_rabbitMqConfig.UserName);
                credintials.Password(_rabbitMqConfig.Password);
    
            });
            cfg.ReceiveEndpoint(_rabbitMqConfig.QueueName, e =>
            {
                e.PrefetchCount = 20;
                e.ExchangeType = "direct";
                e.ConfigureConsumeTopology = false;
                e.AddMessageDeserializer(new ContentType("text/plain"),
                    () => new CustomMessageDeserializer("text/plain"));
                e.Consumer(() => _messageConsumer);
            });
        }).StartAsync(cancellationToken);
    }
    

    CustomMessageDeserializer:

        public class CustomMessageDeserializer : IMessageDeserializer
        {
            private readonly string _contentType;
            private readonly JsonSerializer _serializer = JsonSerializer.Create();
    
            public CustomMessageDeserializer(string contentType)
            {
                _contentType = contentType;
            }
    
            public ContentType ContentType => new(_contentType);
    
            public ConsumeContext Deserialize(ReceiveContext receiveContext)
            {
                try
                {
                    var messageEncoding = GetMessageEncoding(receiveContext);
                    using var body = receiveContext.GetBodyStream();
                    using var reader = new StreamReader(body, messageEncoding, false, 1024, true);
                    using var jsonReader = new JsonTextReader(reader);
                    var messageToken = _serializer.Deserialize<JToken>(jsonReader);
                    
                    return new RawJsonConsumeContext(_serializer, receiveContext, messageToken);
                }
                catch (JsonSerializationException ex)
                {
                    throw new SerializationException("A JSON serialization exception occurred while deserializing the message", ex);
                }
                catch (SerializationException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    throw new SerializationException("An exception occurred while deserializing the message", ex);
                }
            }
    
            public void Probe(ProbeContext context) { }
    
            public static Encoding GetMessageEncoding(ReceiveContext receiveContext)
            {
                var contentEncoding = receiveContext.TransportHeaders.Get("Content-Encoding", default(string));
                return string.IsNullOrWhiteSpace(contentEncoding) ? Encoding.UTF8 : Encoding.GetEncoding(contentEncoding);
            }
    
        }