Search code examples
javakubernetesrabbitmqrabbitmq-stream

RabbitMQ Java Stream Client and RabbitMQ Kubernetes Operator


I deployed a Kubernetes cluster using the RabbitMQ Operator and activated the rabbitmq_stream plugin. This is my yaml:

apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq-deployment
  namespace: rabbitmq-namespace
spec:
  replicas: 2
  image: rabbitmq:3.11.13
  persistence:
    storage: 20Gi
  service:
    type: LoadBalancer
  rabbitmq:
    additionalPlugins:
      - rabbitmq_stream
      - rabbitmq_stream_management

Also i use the RabbitMQ Java Stream Client and i'm connecting to the cluster like this:

EnvironmentBuilder environmentBuilder = Environment.builder();
environmentBuilder.host(System.getenv("RABBITMQ_HOST"));
environmentBuilder.port(Integer.parseInt(System.getenv("RABBITMQ_STREAM_PORT")));
environmentBuilder.username(System.getenv("RABBITMQ_USERNAME"));
environmentBuilder.password(System.getenv("RABBITMQ_PASSWORD"));
mainConnection = environmentBuilder.build();

Now when i use this client so create the stream, it's working flawlessy and no error is reported:

mainConnection.streamCreator().stream("mystream").maxAge(Duration.of(1, ChronoUnit.DAYS)).create()

Now when i try to produce messages like this:

Producer producer = RabbitMQStreamConnection.mainConnection.producerBuilder().stream("mystream").build();
byte[] messagePayload = "hello".getBytes(StandardCharsets.UTF_8);
producer.send(
    producer.messageBuilder().addData(messagePayload).build(),
    confirmationStatus -> {
        if (confirmationStatus.isConfirmed()) {
            // the message made it to the broker
        } else {
            // the message did not make it to the broker
        }
});

It throws this exception:

com.rabbitmq.stream.StreamException
Error while creating stream connection to rabbitmq-deployment-server-0.rabbitmq-deployment-nodes.rabbitmq-namespace:5552

Of course, because there are two nodes (replicas = 2) and it seems like traffic gets redirected directly.

What i want is that i can produce & consume messages from the stream.

Right now, i have no clue what i could do next to solve this problem.


Solution

  • You should use the load balancer configuration.

    See: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#when-a-load-balancer-is-in-use

    A load balancer can misguide the client when it tries to connect to nodes that host stream leaders and replicas. The "Connecting to Streams" blog post covers why client applications must connect to the appropriate nodes in a cluster and how a load balancer can make things complicated for them.

    The EnvironmentBuilder#addressResolver(AddressResolver) method allows intercepting the node resolution after metadata hints and before connection. Applications can use this hook to ignore metadata hints and always use the load balancer, as illustrated in the following snippet:

    Using a custom address resolver to always use a load balancer

    Address entryPoint = new Address("my-load-balancer", 5552);  
    Environment environment = Environment.builder()
        .host(entryPoint.host())  
        .port(entryPoint.port())  
        .addressResolver(address -> entryPoint)  
        .build();