Search code examples
javaspringspring-bootapache-kafkaspring-kafka

Why can I see only every other message in this Kafka example?


I'm trying to modify one of spring cloud stream samples and the results I'm getting are confusing - even though I registered only a single stream listener for my channel I'm getting only every second message. I suspect this is caused by default load balancing for a single kafka partition, but I can't figure out how to confirm this.

docker ps shows only a single instance of kafka broker being up

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
e058697a3bb2        wurstmeister/kafka       "start-kafka.sh"         5 minutes ago       Up 5 minutes        0.0.0.0:9092->9092/tcp                               kafka-uppercase-tx
d001389ddfa4        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   5 minutes ago       Up 5 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   uppercasetransformer_zookeeper_1

Checking the kafka console consumer also generates responses of a single type, this time it's BAR only though:

/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR

Checking the consumer group description and members doesn't show any additional consumers though, so my theory of load balancing is failing here:

/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
input           0          0               0               0               consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1     consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members

CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1     consumer-2      1

I can't see anything wrong with the topic description either:

/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: output   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

Why only every second message is delivered to my output channel and how can I check this on my own?

kafka-demo.java:

package demo;

import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

@EnableBinding(Processor.class)
public class UppercaseTransformer {

    private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);

    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String transform(String payload) {
        logger.info("transforming payload {}", payload);
        return payload.toUpperCase();
    }

    static class TestSource {
        private AtomicLong longSemaphore = new AtomicLong(0L);

        @Bean
        @InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
        public MessageSource<String> sendTestData() {
            return () ->
            {
                final long semaphoreValue = longSemaphore.getAndIncrement();
                final boolean condition = semaphoreValue % 2 == 0;
                final String foobar = condition ? "foo" : "bar";
                logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
                return new GenericMessage<>(foobar);
            };

        }

        @StreamListener(Processor.OUTPUT)
        public void receive(String payload) {
            logger.info("Data received: {}", payload);
        }
    }
}

logs:

2019-08-05 22:48:02.971  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : transforming payload foo
2019-08-05 22:48:02.972  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : Data received: FOO
2019-08-05 22:48:03.973  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : transforming payload bar
2019-08-05 22:48:04.976  INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer                : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977  INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer                : transforming payload foo
2019-08-05 22:48:04.977  INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer                : Data received: FOO
2019-08-05 22:48:05.978  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : transforming payload bar
2019-08-05 22:48:06.980  INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer                : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981  INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer                : transforming payload foo
2019-08-05 22:48:06.982  INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer                : Data received: FOO
2019-08-05 22:48:07.982  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : transforming payload bar

application-local.yml:

spring:
  cloud:
    stream:
#      bindings:
#        output:
#          destination: xformed
#        test-sink:
#          destination: xformed
#        input:
#          destination: testtock
#        test-source:
#          destination: testtock
      default-binder: kafka

Solution

  • You have two consumers on the output channel - the binding to the topic and your receive() service activator.

    The default round robin processing sends messages alternately to your service activator and the topic.