Search code examples
rabbitmqmicroservicesasp.net-core-3.1rabbitmq-exchange

Microservices Not responding after subscribing to RabbitMQ fanout exchange


My .net core3.1 web application has say 4 microservices (MasterMS, PartyMS, ProductMS, PurchaseMS) and uses Rabbitmq as message broker.

In one specific scenario, the MasterMS publishes an event (insert/update in Company table) to Rabbitmq exchange (xAlexa), from where it is fanned-out to the respective queues of all subscribing MSs (PartyMS, ProductMS).

PartyMS get the event from CompanyEventPartyMS queue and ProductMS gets it from CompanyEventProductMS queue. Thereby both Party and Product updates their respective Company table and everything is in Sync and perfect. Btw, PurchaseMS is not subscribing and so not bothered.

Now comes the real problem.. The subscribing MSs (Consumers) does not respond when their web page is requested. PartyMS and ProductMS webpages throws SocketException, while the non-subscriber PurchaseMS works fine. Now if i Comment out the line where PartyMS subscribes, it starts working again though it no longer gets the CompanyEvent and goes out-of-sync. Any insights friends ?

SocketException: No connection could be made because the target machine actively refused it. System.Net.Http.ConnectHelper.ConnectAsync(string host, int port, CancellationToken cancellationToken)

public void Publish<T>(T @event) where T : Event
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "xAlexa", type: ExchangeType.Fanout);

                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);
                var eventName = @event.GetType().Name;

                channel.BasicPublish(exchange: "xAlexa",
                                 routingKey: eventName, //string.Empty,
                                 basicProperties: null,
                                 body: body);
            }
        }

StartBasicConsume

private void StartBasicConsume<T>() where T : Event
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                DispatchConsumersAsync = true
            };

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            var eventName = typeof(T).Name;
            var msName = typeof(T).FullName;
            string[] str = { };
            str = msName.Split('.');
            eventName += str[1];
            
            channel.ExchangeDeclare(exchange: "xAlexa",
                                        type: ExchangeType.Fanout);
                        
            channel.QueueDeclare(eventName, true, false, false, null);  //channel.QueueDeclare().QueueName;

            channel.QueueBind(queue: eventName,
                              exchange: "xAlexa",
                              routingKey: string.Empty);

            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += Consumer_Received;

            channel.BasicConsume(eventName, true, consumer);
            Console.WriteLine("Consumer Started");
            Console.ReadLine();
        }

     private async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var eventName = e.RoutingKey;
            var body = e.Body.ToArray();
            //var body = e.Body.Span;
            var message = Encoding.UTF8.GetString(body);
            //var message = Encoding.UTF8.GetString(e.Body);
            Console.WriteLine(message);

            try
            {
                await ProcessEvent(eventName, message).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
            }
        }

The call to ProductsMS Api from MVC app (here is where it fails if subscribed and works if not subscribed to CompanyEvent !)

public class ProductService:IProductService
    {
        private readonly HttpClient _apiCLient;

        public ProductService(HttpClient apiCLient)
        {
            _apiCLient = apiCLient;
        }

       public async Task<List<Product>> GetProducts()
        {
            var uri = "https://localhost:5005/api/ProductApi";
            List<Product> userList = new List<Product>();
            HttpResponseMessage response = await _apiCLient.GetAsync(uri);
            if (response.IsSuccessStatusCode)
            {
                var readTask = response.Content.ReadAsStringAsync().Result;
                userList = JsonConvert.DeserializeObject<List<Product>>(readTask);
            }
            return userList;
        }
}

Find ProductsMS Api Startup.cs below:

namespace Alexa.ProductMS.Api
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            var connectionString = Configuration["DbContextSettings:ConnectionString"];
            var dbPassword = Configuration["DbContextSettings:DbPassword"];
            var builder = new NpgsqlConnectionStringBuilder(connectionString)
            {
                Password = dbPassword
            };
            services.AddDbContext<ProductsDBContext>(opts => opts.UseNpgsql(builder.ConnectionString));

            services.AddMediatR(typeof(Startup));

            RegisterServices(services);
        }

        private void RegisterServices(IServiceCollection services)
        {
            DependencyContainer.RegisterServices(services);
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            });
            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            ConfigureEventBus(app); //WORKS IF COMMENTED; FAILS OTHERWISE <---
        }

        private void ConfigureEventBus(IApplicationBuilder app)
        {
            var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();            
            eventBus.Subscribe<CompanyEvent, CompanyEventHandler>();
            eventBus.Subscribe<PartyEvent, PartyEventHandler>();
        }

    }
}

Also see the images:

RabbitMQ fanout exchange

RabbitMQ Queues

exchange

queues


Solution

  • Remove the last line Console.ReadLine(); of the method StartBasicConsume(). When we are using that line in the function it is waiting for any key press or any input.