Search code examples
javaspring-bootamqpspring-amqpaxon

Axon: How to configure amqp publishing for single events?


I have a simple spring-driven service which publishes events via amqp. The configuration is based on bootiful-axon.

Now I want the service to maintain some private state. It is a simple use case that can be realized with 3 extra events. Those events have no meaning outside the service's scope so I don't want them to "leave".

How can I specify which events should be published via amqp and which not?


Solution

  • This is how I solved it:

    Custom SpringAMQPPublisher that intercepts the send method:

    public class SelectiveAmqpPublisher extends SpringAMQPPublisher {
    
    
        static boolean shouldSend (Class<?> pt) {
            return PublicEvent.class.isAssignableFrom(pt);
        }
    
    
        public SelectiveAmqpPublisher (
                SubscribableMessageSource<EventMessage<?>> messageSource) {
    
            super(messageSource);
    
        }
    
    
        @Override
        protected void send (List<? extends EventMessage<?>> events) {
    
            super.send(events.stream()
                            .filter(e -> shouldSend(e.getPayloadType()))
                            .collect(Collectors.toList()));
    
        }    
    
    }
    

    Configuration:

    @Autowired
    private AMQPProperties amqpProperties;
    
    @Autowired 
    private RoutingKeyResolver routingKeyResolver;
    
    @Autowired
    private AMQPMessageConverter aMQPMessageConverter;
    
    
    @Bean(initMethod = "start", destroyMethod = "shutDown")
    public SpringAMQPPublisher amqpBridge(
                 EventBus eventBus, 
                 ConnectionFactory connectionFactory,
                 AMQPMessageConverter amqpMessageConverter) {
    
        SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);
    
    
    
        // The rest is from axon-spring-autoconfigure...
    
        publisher.setExchangeName(amqpProperties.getExchange());
        publisher.setConnectionFactory(connectionFactory);
        publisher.setMessageConverter(amqpMessageConverter);
        switch (amqpProperties.getTransactionMode()) {
    
            case TRANSACTIONAL:
                publisher.setTransactional(true);
                break;
            case PUBLISHER_ACK:
                publisher.setWaitForPublisherAck(true);
                break;
            case NONE:
                break;
            default:
                throw new IllegalStateException("....");
        }
    
        return publisher;
    
    }