Search code examples
c#asp.net.net-coredapr

Dapr PubSub with dotnet SDK


I'm trying to run a basic Dapr setup with dotnet. I followed documentation and sample projects but with no luck for now.

I created a simple dotnet web API application with net5.0. API has one controller with three pairs of get/post endpoints. Each pair is for a specific pub-sub provider (nats, rabbit, Redis).

using System.Runtime.Serialization;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Request.Body.Peeker;
namespace live
{
    [ApiController]
    [Route("/")]
    public class HomeController : ControllerBase
    {
        private readonly ILogger<HomeController> logger;
        private readonly DaprClient dapr;
        public HomeController(ILogger<HomeController> logger, DaprClient dapr)
        {
            this.dapr = dapr;
            this.logger = logger;

        }
        [HttpGet]
        public async Task<ActionResult> Produce()
        {
            var message = new Message() { Payload = "Nats Jetstream poruka" };
            await this.dapr.PublishEventAsync<Message>("nats-pubsub", "orders.new", message);

            return Ok("Sent!");
        }


        [HttpPost("nats/subscribe")]        
        [Topic("nats-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeAsync(Message message)
        {          
            
            this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        }


        [HttpGet("rabbit")]
        public async Task<ActionResult> ProduceRabbit()
        {
            var message = new Message() { Payload = "Rabbit MQ poruka" };
            await this.dapr.PublishEventAsync<Message>("rabbit-pubsub", "orders.new", message);

            return Ok("Sent!");
        }


        //[HttpPost("rabbit/subscribe")]
        [Route("rabbit/subscribe")]
        [HttpPost()]
        [Topic("rabbit-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeRabbitAsync(Message message)
        {            
            this.logger.LogInformation("ssage received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        }


        [HttpGet("redis")]
        public async Task<ActionResult> ProduceRedis()
        {
            var message = new Message() { Payload = "Redis poruka" };
            await this.dapr.PublishEventAsync<Message>("redis-pubsub", "orders.new", message);

            return Ok("Sent!");
        }


        [HttpPost("redis/subscribe")]
        [Topic("redis-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeRedisAsync(Message message)
        {
            this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        }
    }

    
    public class Message
    {
        public string Payload { get; set; }
    }
}

Startup.cs for application looks like

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Live
{
    public class Startup
    {        
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers()
                .AddNewtonsoftJson()
                .AddDapr();
            services.AddHttpClient();
            services.AddDaprClient(); //Really no need for this
        }
                
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();
            //app.UseCloudEvents();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapSubscribeHandler();
                endpoints.MapControllers();
            });
        }
    }
}

Dapr configurations

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6380
  - name: redisPassword
    value: ""

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: nats-pubsub
  namespace: default
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: name
    value: "alan"
  - name: durableName
    value: "conversation-durable"
  - name: queueGroupName
    value: "conversation-group"
  # - name: startSequence
  #   value: 1
  # - name: startTime # in Unix format
  #   value: 1630349391
  # - name: deliverAll
  #   value: false
  # - name: flowControl
  #   value: false

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbit-pubsub
  namespace: default
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"

My docker-compose file

version: '3.4'

services: 
  nats:
    container_name: "Nats"
    image: nats
    command: [ "-js", "-m", "8222", "-D", "-V" ]
    ports:
      - "4222:4222"
      - "8222:8222"
      - "6222:6222"

  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"

  postgres:
    container_name: "PostgreSQL"
    image: postgres
    environment:
      - POSTGRES_PASSWORD=rotring123
      - PGDATA=/var/lib/postgresql/data/pgdata
    # volumes:
    #   - .\\docker-volumes\\postgreSQL:/var/lib/postgresql/data
    ports:
      - "8081:8080"
      - "5432:5432"

  redis:
    container_name: Redis
    image: redis
    ports:
      - "6380:6379"
    # volumes:
    #   - .\\docker-volumes\\redis:/usr/local/etc/redis

  # dapr-placement:
  #   container_name: Dapr-service-descovery
  #   image: "daprio/dapr:1.0.0"
  #   command: ["./placement", "-port", "50000", "-log-level", "debug"]
  #   ports:
  #     - "50000:50000"

  # zipkin:
  #   image: openzipkin/zipkin-slim
  #   ports:
  #     - "5411:9411"

I'm starting app from CLI with the command dapr run -a live -p 5226 dotnet run

Application is started and when I go to get endpoint message is sent. I can confirm that message is sent to the message broker and payload is ok. Also, Dapr calls my post endpoint (everyone rabbit, nats and redis) but in method parameter I receive null value for the Payload property of the Message class.

I followed the TrafficControll example and It seems to me that everything is set up correctly.

Dapr runtime version: 1.4.3 Here is a screenshot of logs: https://prnt.sc/1xa8s14

Any help much appreciated!


Solution

  • Add the [FromBody] attribute to the action method parameter.

    For example:

    public async Task<ActionResult> SubscribeAsync([FromBody] Message message)