Search code examples

Masstransit can't connect to a queue in RabbitMQ from c# app

I have a queue in RabbitMQ. I can't configurate this queue, I have to consume messages from it. Publisher don't use Masstransit for publishing. I am using Masstransit to consume messages from the queue.

When I am trying to configurate connection to the queue, I am receiving this error:

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'my_vhost': received 'fanout' but current is 'direct'', classId=40, methodId=10

My configuration looks like:

Bus.Factory.CreateUsingRabbitMq(cfg =>
    cfg.Host("", "my_virtual_host", credintials =>

    cfg.ReceiveEndpoint("my_queue", e =>

        e.Consumer(() => _messageConsumer);

The queue has configuration Durable = true and that's it, nothing special.

When I am trying to connect to the queue via RabbitMQ.Client, it connects without problems. Consuming works well too.

How can I solve the problem?


  • I solve my problem. Connection to Masstransit looks like:

    private async Task InitMasstransitBusAsync(CancellationToken cancellationToken)
        await Bus.Factory.CreateUsingRabbitMq(cfg =>
            cfg.Host(new Uri(_rabbitMqConfig.HostName), credintials =>
            cfg.ReceiveEndpoint(_rabbitMqConfig.QueueName, e =>
                e.PrefetchCount = 20;
                e.ExchangeType = "direct";
                e.ConfigureConsumeTopology = false;
                e.AddMessageDeserializer(new ContentType("text/plain"),
                    () => new CustomMessageDeserializer("text/plain"));
                e.Consumer(() => _messageConsumer);


        public class CustomMessageDeserializer : IMessageDeserializer
            private readonly string _contentType;
            private readonly JsonSerializer _serializer = JsonSerializer.Create();
            public CustomMessageDeserializer(string contentType)
                _contentType = contentType;
            public ContentType ContentType => new(_contentType);
            public ConsumeContext Deserialize(ReceiveContext receiveContext)
                    var messageEncoding = GetMessageEncoding(receiveContext);
                    using var body = receiveContext.GetBodyStream();
                    using var reader = new StreamReader(body, messageEncoding, false, 1024, true);
                    using var jsonReader = new JsonTextReader(reader);
                    var messageToken = _serializer.Deserialize<JToken>(jsonReader);
                    return new RawJsonConsumeContext(_serializer, receiveContext, messageToken);
                catch (JsonSerializationException ex)
                    throw new SerializationException("A JSON serialization exception occurred while deserializing the message", ex);
                catch (SerializationException)
                catch (Exception ex)
                    throw new SerializationException("An exception occurred while deserializing the message", ex);
            public void Probe(ProbeContext context) { }
            public static Encoding GetMessageEncoding(ReceiveContext receiveContext)
                var contentEncoding = receiveContext.TransportHeaders.Get("Content-Encoding", default(string));
                return string.IsNullOrWhiteSpace(contentEncoding) ? Encoding.UTF8 : Encoding.GetEncoding(contentEncoding);