Search code examples
kotlinaxonaxon-framework

Axonframework, how to use MessageDispatchInterceptor with reactive repository


I have read the set-based consistency validation blog and I want to validate through a dispatch interceptor. I follow the example, but I use reactive repository and it doesn't really work for me. I have tried both block and not block. with block it throws error, but without block it doesn't execute anything. here is my code.

class SubnetCommandInterceptor : MessageDispatchInterceptor<CommandMessage<*>> {

  @Autowired
  private lateinit var privateNetworkRepository: PrivateNetworkRepository

  override fun handle(messages: List<CommandMessage<*>?>): BiFunction<Int, CommandMessage<*>, CommandMessage<*>> {
    return BiFunction<Int, CommandMessage<*>, CommandMessage<*>> { index: Int?, command: CommandMessage<*> ->
      if (CreateSubnetCommand::class.simpleName == (command.payloadType.simpleName)){
        val interceptCommand = command.payload as CreateSubnetCommand
        privateNetworkRepository
          .findById(interceptCommand.privateNetworkId)
          // ..some validation logic here ex.
          // .filter { network -> network.isSubnetOverlap() }
          .switchIfEmpty(Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet.")))
          // .block() also doesn't work here it throws error
         // block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-
      }
      command
    }
  }
}

Solution

  • Subscribing to a reactive repository inside a message dispatcher is not really recommended and might lead to weird behavior as underling ThreadLocal (used by Axox) is not adapted to be used in reactive programing

    Instead, check out Axon's Reactive Extension and reactive interceptors section.

    For example what you might do:

    reactiveCommandGateway.registerDispatchInterceptor(
            cmdMono -> cmdMono.flatMap(cmd->privateNetworkRepository
          .findById(cmd.privateNetworkId))
    .switchIfEmpty(
    Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet."))
    .then(cmdMono)));