Search code examples
javacqrsevent-sourcingaxon

Axon - stuck in 1 event after error occurred


I recently learned Axon and CQRS & Event Sourcing.

CMIIW event store is like an application state, so in Axon we have org.axonframework.eventhandling.GlobalSequenceTrackingToken which means we can know the current state of our apps at a certain time from this record right?

processor_name   |   segment owner| timestamp                      | token             | token_type
myapps.projection|       0    far |  2021-02-03T02:22:46.6003318Z  | {"globalIndex":32}|  org.axonframework.eventhandling.GlobalSequenceTrackingToken

So everything works fine until I forget to do some validation and then my apps stuck in 1 event ({"globalIndex":32}) trying to publish this event until successfully.

this happens since my @Eventhandler throw an error.

Explanation: I have 2 events, with the same userName and the @EventHandler cant resolve this and throw error since the userName must be unique.

token:31,payload: {"id":"d6554811-ee5a-46d5-973d-1d61ee71c7fe","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}

token:32,payload:{"id":"b41efb71-14ee-4a5f-bdef-93b36e2e6a84","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}

my question is there any best practices to handle errors so other events will work well without waiting to fix this single event? or is there any solution to fix this ?


Solution

  • Assuming you're using Axon 4, and the default TrackingEventProcessor. The events form a stream, so you have to handle this event before processing the next; throwing an exception from the @EventHandler tells Axon that you are not currently able to process the event. In this specific case you probably should simply log an error, but not throw, as the second event simply doesn't make sense - it's asking to create something that already exists.

    For other types of failing events, it might be interesting to again only log the error and allow the stream to continue, and replay the events afterwards, once you have remedied the cause of the failure. To replay specific events for a single Aggregate, you can do something like this:

    @Autowired
    EventStore eventStore;
        
    @Test
    public void testReplay() {
        
        String aggregateIdentifier = "1234";
        long startFrom = 0;
        String eventType = "org.sample.model.events.MyEvent";
        Class projectorClass = MyProjector.class;
        
        AnnotationEventHandlerAdapter eventHandlerAdapter = new AnnotationEventHandlerAdapter(projectorClass);
        DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier, startFrom);
        eventStream.asStream()
            .filter(event -> {
                // add any type of filtering based on the event here
                return event.getType() == eventType;
            })
            .forEach(event1 -> {
                try {
                    eventHandlerAdapter.handle(event1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
    }
    

    You could expose this method in an operational interface to allow your operations team to replay specific events after analysis.