Search code examples
springspring-integrationspring-integration-dsl

How to test JPA adaptor steps in Spring Integration Flow


I have an integration flow (simplified version below) which has multiple steps with JPA adapter. With the first one, i'm getting the existing record from database with the id, and in the second one, saving the updated entity into the db.

@Autowired private EntityManager entityManager;

@Bean
public IntegrationFlow flow() {
  return IntegrationFlows
      .from(
          Kafka.messageDrivenChannelAdapter(consumerFactory(), “inTopic”)
              .id("InboundKafkaAdapter"))
      .transform(transform())
      .filter(filter())
      .handle(
          Jpa.retrievingGateway(this.entityManager)
              .idExpression("headers['" + Headers.ID + "']")
              .entityClass(TestEntity.class),
          s -> s.advice(interceptForResult()).requiresReply(true))
      .filter(secondFilter())
      .transform(transformUpdatedEntity())
      .handle(jpaAdapter(), ConsumerEndpointSpec::transactional)
      .handle(
          Kafka.outboundChannelAdapter(kafkaTemplate())
                  .topic(“outTopic”))
      .get();
}

private JpaUpdatingOutboundEndpointSpec jpaAdapter() {
  return Jpa.updatingGateway(this.entityManager)
      .entityClass(TestEntity.class)
      .flush(true)
      .persistMode(PersistMode.MERGE);
}

For testing, I would like to mock all the external parties and test the flow itself. I've used IntegrationFlowContext as below:

@SpringBootTest
@SpringIntegrationTest
public class SampleFlowTest {
 
 
  @Mock
  private SampleFlow sampleFlow;

  @Test
  public void testSampleFlow() throws IOException {
  
    IntegrationFlow originalFlow = sampleFlow.flow();
    IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
      integrationFlowContext.registration(originalFlow).register();

     final Message<?> request =
      MessageBuilder.withPayload(“somePayload)
        .setHeader(KafkaHeaders.TOPIC, “inTopic”)
        .setHeader(KafkaHeaders.MESSAGE_KEY, "1")
        .setHeader(KafkaHeaders.RECEIVED_MESSAGE_KEY, "1")
        .build();

    Message<?> response =
      flowRegistration.getMessagingTemplate().sendAndReceive(request);


    flowRegistration.destroy();
  }

@Configuration
@EnableIntegration
public static class Config {
// Some beans for config
}
}

When i run the test, the flow continues until the first JPA step. Since the logic in interceptForResult() was not working, it was creating an issue. I removed the first one to see the behaviour for the second, and it also throws an exception for transactionManager bean been missing.

I also tried to use substituteMessageHandlerFor for mocking these 2 handlers (with setting ids), but again getting a missing bean exception for the ids i defined. I think, it is because that the original flow class is mocked.

So, could you please help to mock JPA steps in my test? Thanks!

UPDATED -- Test implementation

@DirtiesContext
@SpringIntegrationTest (noAutoStartup = {"InboundKafkaAdapter"})
@SpringBootTest(
  webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
  classes = Application.class)
@ExtendWith({SpringExtension.class})
public class FlowTest {

  @Autowired private ApplicationContext applicationContext;

  @Autowired private MockIntegrationContext mockIntegrationContext;

  @Autowired private MyIntegrationFlow integrationFlow;

  @Autowired private QueueChannel testChannel;

  @Autowired
  @Qualifier(“flow.channel#0")
  private MessageChannel flow;

  @Test
  public void test() {

    Message request = generateMessageForEvent();

    MessageHandler mockMessageHandler = mockMessageHandler().handleNextAndReply(Function.identity());

    this.mockIntegrationContext.substituteMessageHandlerFor(
      "JpaRetrievingGateway", mockMessageHandler);
    this.mockIntegrationContext.substituteMessageHandlerFor(
      "JpaUpdatingGateway", mockMessageHandler);
    this.mockIntegrationContext.substituteMessageHandlerFor(
      "OutboundKafkaAdapter", mockMessageHandler);

    flow.send(request);

    Message<String> reply = (Message<String>) testChannel.receive(0);
    Assert.assertNotNull("reply should not be null", reply);
}


@Configuration
@EnableIntegration
public static class Config {

  @Bean
  public QueueChannel testChannel(){
    return new QueueChannel();
  }
}

Solution

  • Any IntegrationFlow starts with a MessageChannel, even if it has that from(Kafka., there is still an inputChannel. In your production code an initiator of the flow is going to be a ConsumerRecord from Apache Kafka, but for testing it is fully OK to send a message directly to the first channel of the flow.

    So, step one: no need in any extra dynamic flow registration trying to mock and patch everything whatever you have in a main flow. You just need:

    @Autowired
    IntegrationFlow flow;
    
    ...
    
    flow.getInputChannel()
    

    Since you are not interested in Kafka at all, you really can just use substituteMessageHandlerFor() for that last .handle(Kafka.).

    And since you are interested in the Jpa.updatingGateway() you can mock that handler with replying MockMessageHandler.

    In the main flow you do:

    .handle(
          Kafka.outboundChannelAdapter(kafkaTemplate())
                  .topic(“outTopic”)
                  .id("kafkaOutboundEndpoint"))
    

    In the test you do:

     MessageHandler mockMessageHandler =
                mockMessageHandler()
                        .handleNextAndReply(Function.identity());
    
        this.mockIntegrationContext
                .substituteMessageHandlerFor("kafkaOutboundEndpoint",
                        mockMessageHandler);
    

    And request-reply interaction:

    MessagingTemplate template = new MessagingTemplate();
    Message<?> reply = template.sendAndReceive(flow.getInputChannel(), testMessage);
    

    Or you can just interact with that mockMessageHandler using handleNext() callback for assertions. Then you don't need a reqeust-reply and you ca just send to the flow.getInputChannel().