I'm trying to modify one of spring cloud stream samples and the results I'm getting are confusing - even though I registered only a single stream listener for my channel I'm getting only every second message. I suspect this is caused by default load balancing for a single kafka partition, but I can't figure out how to confirm this.
docker ps
shows only a single instance of kafka broker being up
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e058697a3bb2 wurstmeister/kafka "start-kafka.sh" 5 minutes ago Up 5 minutes 0.0.0.0:9092->9092/tcp kafka-uppercase-tx
d001389ddfa4 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 5 minutes ago Up 5 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp uppercasetransformer_zookeeper_1
Checking the kafka console consumer also generates responses of a single type, this time it's BAR
only though:
/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR
Checking the consumer group description and members doesn't show any additional consumers though, so my theory of load balancing is failing here:
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
input 0 0 0 0 consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2 1
I can't see anything wrong with the topic description either:
/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output PartitionCount:1 ReplicationFactor:1 Configs:
Topic: output Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Why only every second message is delivered to my output
channel and how can I check this on my own?
kafka-demo.java:
package demo;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
@EnableBinding(Processor.class)
public class UppercaseTransformer {
private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String payload) {
logger.info("transforming payload {}", payload);
return payload.toUpperCase();
}
static class TestSource {
private AtomicLong longSemaphore = new AtomicLong(0L);
@Bean
@InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () ->
{
final long semaphoreValue = longSemaphore.getAndIncrement();
final boolean condition = semaphoreValue % 2 == 0;
final String foobar = condition ? "foo" : "bar";
logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
return new GenericMessage<>(foobar);
};
}
@StreamListener(Processor.OUTPUT)
public void receive(String payload) {
logger.info("Data received: {}", payload);
}
}
}
logs:
2019-08-05 22:48:02.971 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:03.973 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:04.976 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:06.980 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:06.982 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:07.982 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
application-local.yml:
spring:
cloud:
stream:
# bindings:
# output:
# destination: xformed
# test-sink:
# destination: xformed
# input:
# destination: testtock
# test-source:
# destination: testtock
default-binder: kafka
You have two consumers on the output
channel - the binding to the topic and your receive()
service activator.
The default round robin processing sends messages alternately to your service activator and the topic.