I have a simple spring-driven service which publishes events via amqp. The configuration is based on bootiful-axon.
Now I want the service to maintain some private state. It is a simple use case that can be realized with 3 extra events. Those events have no meaning outside the service's scope so I don't want them to "leave".
How can I specify which events should be published via amqp and which not?
This is how I solved it:
Custom SpringAMQPPublisher
that intercepts the send
method:
public class SelectiveAmqpPublisher extends SpringAMQPPublisher {
static boolean shouldSend (Class<?> pt) {
return PublicEvent.class.isAssignableFrom(pt);
}
public SelectiveAmqpPublisher (
SubscribableMessageSource<EventMessage<?>> messageSource) {
super(messageSource);
}
@Override
protected void send (List<? extends EventMessage<?>> events) {
super.send(events.stream()
.filter(e -> shouldSend(e.getPayloadType()))
.collect(Collectors.toList()));
}
}
Configuration:
@Autowired
private AMQPProperties amqpProperties;
@Autowired
private RoutingKeyResolver routingKeyResolver;
@Autowired
private AMQPMessageConverter aMQPMessageConverter;
@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
EventBus eventBus,
ConnectionFactory connectionFactory,
AMQPMessageConverter amqpMessageConverter) {
SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);
// The rest is from axon-spring-autoconfigure...
publisher.setExchangeName(amqpProperties.getExchange());
publisher.setConnectionFactory(connectionFactory);
publisher.setMessageConverter(amqpMessageConverter);
switch (amqpProperties.getTransactionMode()) {
case TRANSACTIONAL:
publisher.setTransactional(true);
break;
case PUBLISHER_ACK:
publisher.setWaitForPublisherAck(true);
break;
case NONE:
break;
default:
throw new IllegalStateException("....");
}
return publisher;
}