Search code examples
asp.net-corerabbitmqrpc

RPC using Rabbitmq in ASP.Net Core 3


I want to implement multiple queues using single request

RPC Server.cs

How can I send multiple messages from the server For ex. I created Instances : `

  RPCServer rpcServer1 = new RPCServer();
         rpcServer1.PublishMessage("customerContactPersonsList","customerContactPersons");`

`

RPCServer rpcServer2 = new RPCServer();
     rpcServer2.PublishMessage("ProductInfoList", "projects");



  public void PublishMessage(string message, string rpcQueueName)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: rpcQueueName, durable: false,
              exclusive: true, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: rpcQueueName,
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
                {
                    string response = null;

                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        Console.WriteLine(" [.] Response message is)", message);
                        response = message;
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "There was a error";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange:"topic", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                };

                Console.WriteLine("Press [enter] to exit.");
                Console.ReadLine();
            }

Receiver.cs ** The Receiver side is where the response via queue is retrieved and it also calls the client which basically sends a request and a queue name as a parameter to get json data from the server side where the same queue is declared and when both the names of the queues are matched the response is sent to the receiver end.** so for ex. here projects as a parameter is the queue name given and the same is also mentioned for the server side.

var rpcClient = new RpcClient();
            var customerContactPersons = await rpcClient.CallAsync("", "customerContactPersons");
            var response = await rpcClient.CallAsync("","projects");

var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
                return;
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);
            tcs.TrySetResult(response);
        };
    }

    public Task<string> CallAsync(string message, string rpcQueueName, CancellationToken cancellationToken = default(CancellationToken))
    {
        IBasicProperties props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;
        var messageBytes = Encoding.UTF8.GetBytes(message);
        var tcs = new TaskCompletionSource<string>();
        callbackMapper.TryAdd(correlationId, tcs);

        channel.BasicPublish(
            exchange: "",
            routingKey: rpcQueueName,
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
        return tcs.Task;
    }`

Solution

  • I solved it by adding connection block for each queue and at the end console.readline(); keeps the connection open for the queues to be consumed.

    using (var connection = rpcServer.PublishMessage(customercontact, "customercontact_rpc_queue"))
                    using (var connectionObject = rpcServer.PublishMessage(result, "project_rpc_queue"))
                    using (var customersObject = rpcServer.PublishMessage(customersFromByD, "customer_rpc_queue"))
                    {
                        Console.WriteLine("Press [enter] to exit.");
                        Console.ReadLine();
                    }