Search code examples
apache-kafkaspring-kafkaproject-reactorreactor-kafka

Mock SenderResult in ReactiveKafkaProducerTemplate send method


I am trying to mock the send method for reactiveKafkaConsumerTemplate.

 @Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;

 Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
        .thenReturn(createConsumerRecords(2));



Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
                    .thenReturn(???);

I am trying to mock the send method of reactiveProducerTemplate to return a SenderResult. Is it possible to do so? If yes, can someone point me to documentation/sample to do this. I have spend a lot time looking for solution but couldn't find any.

Update: I tried the following as per suggestions from Gary

ProducerRecord<String, List<Object>> record 
= new  ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta 
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
 
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));

I get the following exception at line .thenReturn(Mono.just(new SendResult<>(record, meta)))). It doesn't mention what is null in the exception and I don't see anything that is null.

java.lang.NullPointerException
    at com.ServiceTests.cTestMethod(ServiceTests.java:69)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)

Update 2: I am able to create the mock with the code snippet from Gary. Here is the code that I am trying to test

  public void sendToKafka(ConsumerRecord<String, String> consumerRecord){
    log.info("sending to topic={}, {}={},", destinationTopic, Metric.class.getSimpleName(), consumerRecord);
    List<Object> metrics = transformRecord(consumerRecord);
    kafkaProducerTemplate.send(destinationTopic, consumerRecord.key(), metrics)
            .doOnSuccess(senderResult -> log.info("sent {} offset : {}", metrics, senderResult.recordMetadata().offset()))
            .doOnError(throwable -> log.error("Error while sending message to destination topic : {}", throwable.getMessage()))
            .subscribe();
}

When I call this method from my test, I can see that the template is the mock template however, I get a java.lang.NullPointerException on the line .doOnSuccess(senderResult -> log.info("sent {} offset : {}", metrics, senderResult.recordMetadata().offset()))

The exception doesn't give any details on what is null. I confirmed consumerRecord and metrics is not null.

Found the issue was with the setup. The actual code is expecting 3 parameters and in the set up I had mocked only with 2 parameters for send method. Updated the code to:

when(reactiveKafkaProducerTemplate.send(Mockito.anyString(),Mockito.anyString(), Mockito.anyList())).thenReturn(Mono.just(result));

Solution

  • @Test
    void test() {
        ReactiveKafkaProducerTemplate<String, String> template = mock(ReactiveKafkaProducerTemplate.class);
        RecordMetadata meta = new RecordMetadata(new TopicPartition("foo", 0), 0L, 0L, 0L, 0L, 0, 2);
        SenderResult result = mock(SenderResult.class);
        when(result.recordMetadata()).thenReturn(meta);
        when(template.send("foo", "bar")).thenReturn(Mono.just(result));
        template.send("foo", "bar")
                .doOnNext(sr -> {
                    assertThat(sr.recordMetadata().toString()).isEqualTo("foo-0@0");
                })
                .subscribe();
    }