Search code examples
spring-bootaxon

Axon framework retry logic


i am trying to use the eventProcessingModule in Axon 4.1+ to do a replay events on a 2 JVM node K8 cluster. while i have it setup that it will clean up events it only picks it up from one of the nodes and the other keeps running since its tracking event is still live.

how do i go about getting it to be disabled across all the JVMs at the same time so it can properly replay? then get enabled on all of them to continue handling the commands.

i have tried upping the threads via this code, which leads to another issue that existing tokens never increase in the InitialSemgmentsCount unless i completely delete the token from the DB.

    public void config(final EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessorConfiguration(c ->
                TrackingEventProcessorConfiguration
                        .forParallelProcessing(2)
                        .andInitialSegmentsCount(2));
        // .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
    }

My Current setup:

# Application.yml
axon:
  distributed:
    enabled: true

a Service component with the following:

autowired in EventProcessingConfiguration eventProcessingModule

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.shutDown();
            trackingEventProcessor.resetTokens();
        });

// Thread.sleep() to verify with

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.start();
        });

the sample code had the above in a single shutDown/reset/start setup but i split them just to see how it is working (believe the @ResetHandler is called after the Reset thus the Start waits, but not fully sure)

SkuProjection component with a @ResetHandler void method to clean up the table to be replayed.

depending on which JVM currently has the token, the other will return this error: SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'

i would think the token_entry table would go to "null" for owner while its "stopped" but since there are 2 JVMs the one that currently doesn't have the token will take it over while the other replays. Confirmed that the 2nd node is running with the Token after the first node stops its processor.


Solution

  • I think I can give you some guidance here.

    You are looking at how to delegate operations to a specific set of Tracking Event Processor across several JVM's, correct? Now, the API Axon Framework provides you in regards to a TrackingEventProcessor, is quite intentionally maintained on the TrackingEventProcessor.

    Thus doing a start(), shutDown(), processingStatus() or resetTokens() is a call you'd perform to a specific Tracking Event Processor instance.

    Note: the resetTokens() method is what effectively will issue a replay for the Event Handlers which the Tracking Event Processor is in charge off.

    Additionally, you're seeing the Unable to claim token exception because the framework requires you that the TrackingEventProcessor performing the reset is the owner of all the tokens of the given processing group. The reason for this is that it needs to adjust the tokens to ReplayTokens to support the nice functions like the @ResetHandler.

    What you are now asking for, if I am correct, is:

    Does Axon Framework provide a means to delegate a specific Tracking Event Processor operation to all instances which are dealing with the same processing group?

    I think I will sadly have to disappoint you here @sherring, Axon Framework does not provide the means to delegate such an operation. The quickest way to do achieve these delegate start/stop/reset operations, is to set up Axon Server, as already the Standard Edition (free) provides you this functionality.

    If for one reason or another this free piece of software is a no go, that means you'll have to create such a delegation system for the start/stop/reset/{insert-any-TrackingEventProcessor-operation} yourself. Or, you could deal with this from an operations perspective more. So, shutting down the second instance of a given Axon Framework application, ensuring there is only a single instance of the TrackingEventProcessor you want to reset.

    Hope this helps you out @sherring!