I have difficulty understanding some Kafka concepts in Java Spring Boot. I’d like to test a consumer against a real Kafka broker running on a server, which has some producers that write / have already written data to various topics. I would like to establish a connection with the server, consume the data, and verify or process its content in a test.
An enormous majority of examples (actually all I have seen so far) in the internet refer to embedded kafka, EmbeddedKafkaBroker, and show both a producer and a consumer implemented on one machine, locally. I haven’t found any example that would explain how to make a connection with a remote kafka server and read data from a particular topic. I've written some code and I've printed the broker address with:
What I got is, which means that it is local, so the connection with the remote server has not been established.
On the other hand, when I run the SpringBootApplication I get the payload from the remote broker.
public class Receiver {
private static final String TOPIC_NAME = "X";
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
LOGGER.info("received the following payload: '{}'", payload);
public class ByteReceiverConfig {
EmbeddedKafkaBroker kafkaEmbeded;
private String bootstrapServers;
private String groupIdConfig;
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
return factory;
ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
Map<String, Object> consumerProperties() {
final Map<String, Object> properties =
KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
return properties;
@SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
@ContextConfiguration(classes = ByteReceiverConfig.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
public class KafkaTest {
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
EmbeddedKafkaBroker embeddedKafkaBroker;
Receiver receiver;
void waitForAssignment() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
public void testReceive() {
I would like somebody to shed some light on the following issues:
1.Can an instance of the class EmbeddedKafkaBroker be used to test data that comes from a remote broker, or is it only used for local tests, in which I would procude i.e send data to a topic that I created and consume data myself?
2.Is it possible to write a test class for a real kafka server? For instance to verify if a connection has been establish, or if a data has been read from a specific topic. What annotations, configurations, and classes would be needed in such case?
3.If I only want to consume data, do I have to provide the producer configuration in a config file (it would be strange, but all examples I have encountered so far did it)?
4.Do you know any resources (books, websites etc.) that show real examples of using kafka i.e. with a remote kafka server, with a procuder or a consumer only?
You don't need an embedded broker at all if you want to talk to an external broker only.
Yes, just set the bootstrap servers property appropriately.
No, you don't need producer configuration.
public class So56044105Application {
public static void main(String[] args) {
SpringApplication.run(So56044105Application.class, args);
public NewTopic topic() {
return new NewTopic("so56044105", 1, (short) 1);
@SpringBootTest(classes = { So56044105Application.class, So56044105ApplicationTests.Config.class })
public class So56044105ApplicationTests {
public Config config;
public void test() throws InterruptedException {
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
public static class Config implements ConsumerSeekAware {
List<String> received = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(3);
@KafkaListener(id = "so56044105", topics = "so56044105")
public void listen(String in) {
public void registerSeekCallback(ConsumerSeekCallback callback) {
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
System.out.println("Seeking to beginning");
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {