I'm trying to test a Spring Kafka listener in a Spring Boot test using @EmbeddedKafka
. However, I keep encountering the following exception:
No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
private final String retryTopic;
public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
@Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
this.kafkaTemplate = kafkaTemplate;
this.retryTopic = retryTopic;
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
batch = "true",
groupId = "group1", concurrency = "2")
public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
try {
CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
if (cancelAuthorizationLinkageWriterResource != null) {
cancelAuthorizationLinkageService.linkageAuthorization(
cancelAuthorizationLinkageWriterResource.getApiResource());
}
} catch (Exception e) {
log.error("listener error: {}", e.getMessage());
kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
cancelAuthorizationLinkageResource);
}
}
}
My listener accepts messages, and if any of these messages fail to process, I push them back to to the same topic for retry. Therefore, my listener both consumes and produces messages. I must ensure the transactional nature of this process to prevent cases where failed messages are not pushed back to the retry topic but the offset for the consumed message is still committed. This would result in the inability to retry the failed messages.
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
"bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
count = 3)
class CancelAuthorizationLinkageListenerTest {
@Autowired
private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;
@Mock
private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;
@Value("${spring.kafka.consumer.linkage-topic}")
private String linkageTopic;
@Value("${spring.kafka.producer.retry-topic}")
private String retryTopic;
private Consumer<String, CancelAuthorizationLinkageResource> consumer;
@BeforeEach
public void setUp() {
consumer = consumerFactory.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
}
@Test
@DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
void of_ok_1() throws Exception {
// init
int ngNumber = 1;
AtomicInteger atomicInteger = new AtomicInteger(0);
// mock
doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());
// verify
cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));
await()
.atMost(2, SECONDS)
.pollInterval(1, SECONDS)
.untilAsserted(() -> {
KafkaTestUtils.getRecords(consumer).records(retryTopic)
.forEach(x -> atomicInteger.incrementAndGet());
assertEquals(ngNumber, atomicInteger.get());
});
}
I'm trying to test a Spring Kafka listener in a Spring Boot test using @EmbeddedKafka
. The listener looks like correctly configured with transactions in production code(I'm not pretty sure), but in the test environment context, although I use @Autowired
, the listener does not operate within a transaction.
Can someone explain why this happens and how to ensure the transactional behavior in the test context as well?
spring:
profiles:
active: "local"
application:
name:
batch:
initialize-schema: ALWAYS
job:
names:
#enable: false
kafka:
bootstrap-servers: localhost:9092
producer:
acks: -1
transaction-id-prefix: cushion-kafka-tx-${random.uuid}
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retry-topic: ppcd.cushion.cancel.auth.retry
# retries: 5
consumer:
group-id: groupid-Dev
auto-offset-reset: earliest
max-poll-records: 20
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
properties:
cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
spring.json.trusted.packages: '*'
isolation.level: read_committed
linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion
With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix
property - Spring Boot will automatically configure a KafkaTransactionManager
bean and wire it into the listener container. According to the Spring documentation, I believe I have configured the transactions correctly.
So, or follow recommendations from that error, or just don't use transaction-id-prefix
property for your producer config.