Attempting to test a basic Spring Cloud Stream application, the EmbeddedKafka broker never get the message published when running the tests.
Here is a github repo for my current code : https://github.com/gjobin/spring-cloud-stream-test
When running the application, I can see the messages published to a local or remote borker and also the Consumer output shows details about Kafka headers.
------ Consumed Record ------
Person(firstname=fn, lastname=LN)
deliveryAttempt : 1
kafka_timestampType : CREATE_TIME
kafka_receivedTopic : sink-in-0
kafka_offset : 46
scst_nativeHeadersPresent : true
kafka_consumer : org.apache.kafka.clients.consumer.KafkaConsumer@2994db0
source-type : kafka
id : 97a186be-5fcb-cc47-26a1-46b9895f6b32
kafka_receivedPartitionId : 0
contentType : application/json
kafka_receivedTimestamp : 1694174130515
kafka_groupId : anonymous.754c77d9-b7a7-4221-9132-34b5c347e299
timestamp : 1694174130552
However, whenever I run the app through @SpringBootTest
, the output becomes more generic to SCS and ommits any Kafka headers
------ Consumed Record ------
Person(firstname=fn, lastname=LN)
source-type : kafka
id : 420355ec-2875-92da-cd83-d0d577142ec3
contentType : application/json
timestamp : 1694122858406
Even running a test suite without the @EmbeddedKafka
still starts the app and produces similar messages, when I would expect the app to crash saying no broker was found, similar to when I run the application itself without running local kafka first.
package com.example.stream;
import ...
/**
* IT tests without any Kafka
*/
@SpringBootTest(classes = Application.class)
@ActiveProfiles(profiles = {"default", "test"})
@Slf4j
class PersonStreamWithoutKafka {
@Test
public void testAppStartsWithoutKafka() throws InterruptedException {
Thread.sleep(5000);
//THIS SHOULD NOT PROCESS MESSAGE, KAFKA IS NOT RUNNING.
}
}
Here is my test class attempting to validate a message was emitted by the producer, but never receives any message.
package com.example.stream;
import ...
/**
* IT tests using EmbeddedKafka
*/
@SpringBootTest(classes = Application.class)
@ActiveProfiles(profiles = {"default", "test"})
@EmbeddedKafka
@Slf4j
class PersonStreamIT {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void testAppProducer() {
//Assert message
Map<String, Object> consumerConfigs = new HashMap<>(KafkaTestUtils.consumerProps("test", "false", embeddedKafkaBroker));
try (JsonDeserializer<Person> jsonDeserializer = new JsonDeserializer<Person>().trustedPackages("*");
Consumer<String, Person> consumer = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), jsonDeserializer).createConsumer()) {
consumer.subscribe(List.of("source-out-0"));
ConsumerRecord<String, Person> record = KafkaTestUtils.getSingleRecord(consumer, "source-out-0", Duration.ofSeconds(2));
Assertions.assertThat(record.value()).isEqualTo(Person.builder().firstname("ln").lastname("fn").build());
}
}
}
For reference, here is the application.yml
configuration and implementation
spring:
config:
activate:
on-profile: default
cloud:
function:
definition: source;uppercase;sink
stream:
bindings:
uppercase-in-0:
destination: source-out-0
uppercase-out-0:
destination: sink-in-0
kafka:
binder:
brokers: localhost:9092
---
spring:
config:
activate:
on-profile: test
cloud:
stream:
kafka:
binder:
brokers: ${spring.embedded.kafka.brokers}
package com.example.stream;
import ...
@Service
@Slf4j
@EnableAutoConfiguration
public class PersonStream {
@Bean
public Supplier<Person> source() {
return () -> Person.builder()
.firstname("fn")
.lastname("ln")
.build();
}
@Bean
public Function<Person, Person> uppercase() {
return p -> {
p.setLastname(p.getLastname().toUpperCase());
return p;
};
}
@Bean
public Consumer<Message<Person>> sink() {
return m -> {
log.info("------ Consumed Record ------");
log.info(m.getPayload().toString());
m.getHeaders().forEach((key, value) -> log.info("{} : {}", key, value.toString()));
};
}
}
Thank you very much for your help.
Not sure if that is what you would expected according to a number of your various tests, but another solution is to suppress Test binder:
spring:
cloud:
stream:
default-binder: kafka
With this I see in the logs:
2023-09-12T16:18:42.347-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : ------ Consumed Record ------
2023-09-12T16:18:42.347-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : Person(firstname=fn, lastname=LN)
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : deliveryAttempt : 1
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [quest-handler-0] k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: Group test with generation 2 is now empty (__consumer_offsets-3)
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_timestampType : CREATE_TIME
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_receivedTopic : sink-in-0
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_offset : 0
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : scst_nativeHeadersPresent : true
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_consumer : org.apache.kafka.clients.consumer.KafkaConsumer@2f8f1eb7
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : source-type : kafka
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : id : 6b38368c-392b-d808-51e8-45f70fa78ce6
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_receivedPartitionId : 0
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : contentType : application/json
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_receivedTimestamp : 1694549922341
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : kafka_groupId : anonymous.3690f709-b5c8-40b7-9a17-57ad6b71aadb
2023-09-12T16:18:42.348-04:00 INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream : timestamp : 1694549922347
Unlike without that property:
2023-09-12T16:23:25.764-04:00 INFO 24564 --- [ scheduling-1] com.example.stream.PersonStream : ------ Consumed Record ------
2023-09-12T16:23:25.764-04:00 INFO 24564 --- [ scheduling-1] com.example.stream.PersonStream : Person(firstname=fn, lastname=LN)
2023-09-12T16:23:25.764-04:00 INFO 24564 --- [ scheduling-1] com.example.stream.PersonStream : source-type : kafka
2023-09-12T16:23:25.764-04:00 INFO 24564 --- [ scheduling-1] com.example.stream.PersonStream : id : 541bc466-613e-64f3-ff08-5badf8d35d87
2023-09-12T16:23:25.764-04:00 INFO 24564 --- [ scheduling-1] com.example.stream.PersonStream : contentType : application/json
2023-09-12T16:23:25.764-04:00 INFO 24564 --- [ scheduling-1] com.example.stream.PersonStream : timestamp : 1694550205764
Test are still failing though, but that should be a fully different story.