I recently learned Axon and CQRS & Event Sourcing.
CMIIW event store is like an application state, so in Axon we have org.axonframework.eventhandling.GlobalSequenceTrackingToken
which means we can know the current state of our apps at a certain time from this record right?
processor_name | segment owner| timestamp | token | token_type
myapps.projection| 0 far | 2021-02-03T02:22:46.6003318Z | {"globalIndex":32}| org.axonframework.eventhandling.GlobalSequenceTrackingToken
So everything works fine until I forget to do some validation and then my apps stuck in 1 event ({"globalIndex":32}
) trying to publish this event until successfully.
this happens since my @Eventhandler
throw an error.
Explanation:
I have 2 events, with the same userName
and the @EventHandler
cant resolve this and throw error since the userName must be unique
.
token:31,payload: {"id":"d6554811-ee5a-46d5-973d-1d61ee71c7fe","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}
token:32,payload:{"id":"b41efb71-14ee-4a5f-bdef-93b36e2e6a84","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}
my question is there any best practices to handle errors so other events will work well without waiting to fix this single event? or is there any solution to fix this ?
Assuming you're using Axon 4, and the default TrackingEventProcessor. The events form a stream, so you have to handle this event before processing the next; throwing an exception from the @EventHandler tells Axon that you are not currently able to process the event. In this specific case you probably should simply log an error, but not throw, as the second event simply doesn't make sense - it's asking to create something that already exists.
For other types of failing events, it might be interesting to again only log the error and allow the stream to continue, and replay the events afterwards, once you have remedied the cause of the failure. To replay specific events for a single Aggregate, you can do something like this:
@Autowired
EventStore eventStore;
@Test
public void testReplay() {
String aggregateIdentifier = "1234";
long startFrom = 0;
String eventType = "org.sample.model.events.MyEvent";
Class projectorClass = MyProjector.class;
AnnotationEventHandlerAdapter eventHandlerAdapter = new AnnotationEventHandlerAdapter(projectorClass);
DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier, startFrom);
eventStream.asStream()
.filter(event -> {
// add any type of filtering based on the event here
return event.getType() == eventType;
})
.forEach(event1 -> {
try {
eventHandlerAdapter.handle(event1);
} catch (Exception e) {
e.printStackTrace();
}
});
}
You could expose this method in an operational interface to allow your operations team to replay specific events after analysis.