Search code examples
gonats.ionats-jetstream

How are Consumers implemented in NATS


The definition of a consumer in NATS is

A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients.

I'm more interested in understanding what exactly is a consumer under the hood ? Is it a process, a thread, some kind of socket i.e. a network entity etc ?

I'm designing a system where I'm using NATS 2.9, so a consumer can't have more than one subject filter. I want to create 1 consumer per subject filter and I'm concerned about the performance & resource implications of creating about 10,000 consumers in the worst case.

Is a consumer an expensive resource to create in NATS ?

Creating a single consumer with a very generic subject filter is an alternative I'm considering but it doesn't fit into my design neatly as several go routines will be sharing this one global consumer as opposed to creating one consumer per go routine, each go routine will consume and process its own subject filter's messages thus ensuring a sort of logical isolation as far as my design goes.


Solution

  • They are extremely cheap. Basically, a connection to NATS is a multiplexer:

    • It connects to a nats server
    • The it expresses its interest in topics via one or more subscriptions.
    • The connection then receives new messages and sends them to the appropriate consumer you set up.

    I created a test setup where I was able to easily have 1500 consumers on a single machine (my laptop), including a NATS cluster and a producer with the whole setup still being able to send and receive several million "Hello, World!"s per second. I actually needed to scale up the producers considerably to even manage to get any meaningful data.

    The environment

    Dockerfile

    We create a Docker image from the binaries you will find below. It is pretty much standard.

    FROM golang:1.22-alpine3.19 as builder
    ARG BINARY_NAME="producer"
    WORKDIR /tmp/cmds
    COPY . .
    RUN go build -o ${BINARY_NAME} ./cmds/${BINARY_NAME}
    
    FROM alpine:3.19
    ARG BINARY_NAME="producer"
    ENV BINARY_NAME=${BINARY_NAME}
    COPY --from=builder /tmp/cmds/${BINARY_NAME} /usr/local/bin/${BINARY_NAME}
    

    docker-compose.yaml

    version: "3.8"
    
    services:
      # A nats cluster with 3 nodes
      nats:
        image: nats:2.10.14-alpine
        command:
          - "--debug"
          - "-m"
          - "8222"
          - "--cluster"
          - "nats://0.0.0.0:6222"
          - "--routes"
          # Note that this needs to be prefixed with the
          # name of the directory that the docker-compose file is in.
          # In this case it's "nats-consumers-78214263" (a mnemonic and question ID)
          - "nats://nats-consumers-78214263-nats-1:6222"
        hostname: nats
        deploy:
          replicas: 3
        healthcheck:
          test: ["CMD", "wget", "--spider", "-q", "http://localhost:8222/varz"]
          interval: 10s
          timeout: 5s
          retries: 3
    
      # The producer
      # You can scale this up via `docker compose scale producer=n`
      # to see how the consumers handle the load
      producer:
        deploy:
          replicas: 1
        build:
          context: .
          args:
            - BINARY_NAME=producer
        command: ["/usr/local/bin/producer"]
        environment:
          - PRODUCER_NATS_URL=nats://nats:4222
          - PRODUCER_PRODUCERS=1
        depends_on:
          nats:
            condition: service_healthy
      # The consumer
      # You can scale this up via `docker compose scale consumer=n`
      # to see how the consumers handle the load
      consumer:
        deploy:
          replicas: 1
        build:
          context: .
          args:
            - BINARY_NAME=consumer
        command: ["/usr/local/bin/consumer"]
        environment:
          - CONSUMER_NATS_URL=nats://nats:4222
          - CONSUMER_TOPIC=test.>
          - CONSUMER_CONSUMERS=15000
        depends_on:
          nats:
            condition: service_healthy
    

    The Services

    Producer

    package main
    
    import (
        "context"
        "fmt"
        "net/url"
        "os/signal"
        "sync"
        "sync/atomic"
        "syscall"
        "time"
    
        "github.com/alecthomas/kong"
        "github.com/nats-io/nats.go"
    )
    
    var producercfg struct {
        NatsURL   url.URL `kong:"name='nats-url',help='NATS server URL',default='nats://nats:4222'"`
        Producers int     `kong:"name='producers',help='Number of producers to start',default='1'"`
    }
    
    func main() {
        ctx := kong.Parse(&producercfg, kong.DefaultEnvars("PRODUCER"))
    
        // Run the configured number of producers in goroutines
        // Note that all producers share the same NATS connection
        // Each producer sends a messsage every 100ms
    
        nc, err := nats.Connect(producercfg.NatsURL.String())
        ctx.FatalIfErrorf(err, "Could not connect to NATS server: %s", producercfg.NatsURL.String())
        defer nc.Close()
    
        // Handle SIGINT and SIGTERM to shut down gracefully
        // We use a context here because that makes it easy for us to shut down
        // all goroutines in one fell swoop, but gracefully so.
        sigs, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
        defer cancel()
    
        var wg sync.WaitGroup
        var sent atomic.Int64
    
        for i := 0; i < producercfg.Producers; i++ {
            wg.Add(1)
            go func(producerContext context.Context, conn *nats.Conn, id int) {
                ctx.Printf("Starting publisher to %s", fmt.Sprintf("test.%d", id))
                defer wg.Done()
    
                for {
                    // We have...
                    select {
                    // either received a signal to shut down...
                    case <-producerContext.Done():
                        ctx.Printf("Producer %d shutting down", id)
                        // ... so we return from the goroutine.
                        return
                    default:
                        // or we send a message.
                        sent.Add(1)
                        err := conn.Publish(fmt.Sprintf("test.%d", id), []byte("Hello, World!"))
                        ctx.FatalIfErrorf(err, "Could not publish message: %s", err)
                    }
                }
            }(sigs, nc, i)
        }
    
        tick := time.NewTicker(time.Second)
    
    evt:
        for {
            // Either we receive a signal to shut down...
            select {
            case <-sigs.Done():
                cancel()
                break evt
            // ... or we print out the number of messages sent so far.
            case <-tick.C:
                ctx.Printf("Sent %d messages", sent.Load())
            }
        }
        ctx.Printf("Received signal, shutting down producers...")
        wg.Wait()
        ctx.Printf("All producers shut down. Exiting.")
    }
    

    Note that you might need to scale up the producer in order to get meaningful data.

    Consumer

    Also, pretty much standard:

    package main
    
    import (
        "context"
        "net/url"
        "os/signal"
        "sync/atomic"
        "syscall"
        "time"
    
        "github.com/alecthomas/kong"
        "github.com/nats-io/nats.go"
    )
    
    var consumercfg struct {
        NatsURL   url.URL `kong:"name='nats-url',help='NATS server URL',default='nats://nats:4222'"`
    
        // Note that with this topic, ALL consumers we create here
        // will receive ALL messages by ALL producers. 
        Topic     string  `kong:"name='topic',help='NATS topic to subscribe to',default='test.>'"`
        Consumers int     `kong:"name='consumers',help='Number of consumers to start',default='1'"`
    }
    
    func main() {
        ctx := kong.Parse(&consumercfg, kong.DefaultEnvars("CONSUMER"))
        ctx.Printf("Starting consumer on %s, subscribing to %s", consumercfg.NatsURL.String(), consumercfg.Topic)
    
        nc, err := nats.Connect(consumercfg.NatsURL.String())
        ctx.FatalIfErrorf(err, "Could not connect to NATS server: %s", consumercfg.NatsURL.String())
        // Run the configured number of consumers in goroutines
        // Note that all consumers share the same NATS connection
        // Each consumer subscribes to the configured topic
        // and counts the number of messages received, printing them out every second.
        // The consumers will stop when SIGINT or SIGTERM are received.
    
        sigs, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
        defer cancel()
    
        for i := 0; i < consumercfg.Consumers; i++ {
            go func(sigs context.Context, conn *nats.Conn, topic string, id int) {
    
                count := atomic.Int64{}
    
                // We use the same connection!
                sub, err := conn.Subscribe(topic, func(msg *nats.Msg) {
                    // Callback for processing a new message.
                    count.Add(1)
                })
                ctx.FatalIfErrorf(err, "Could not subscribe to topic %s: %s", topic, err)
                defer sub.Unsubscribe()
    
                tick := time.NewTicker(time.Second)
                for {
                    select {
                    case <-sigs.Done():
                        ctx.Printf("Received shutdown signal.")
                        ctx.Printf("Final result: received %d messages", count.Load())
                        return
                    case <-tick.C:
                        ctx.Printf("%6d Received %d messages", id, count.Load())
                    }
                }
            }(sigs, nc, consumercfg.Topic, i)
        }
    
        <-sigs.Done()
    }
    

    The code above is available under https://github.com/mwmahlberg/nats-consumers-78214263