Search code examples
axonreplay

Axon Partialy replay, how do i get a TrackingToken for the startPosition for the replay?


I want my Axon replay events, not all but partially. A full replay is up and running but when i want a partially replay i need a TrackingToken startPosition for the method resetTokens(), my problem is how to get this token for the partial replay?

I tried with GapAwareTracingToken but this does not work.

    public void resetTokensWithRestartIndexFor(String trackingEventProcessorName, Long restartIndex) {
        eventProcessingConfiguration
                .eventProcessorByProcessingGroup(trackingEventProcessorName, TrackingEventProcessor.class)
                .filter(trackingEventProcessor -> !trackingEventProcessor.isReplaying())
                .ifPresent(trackingEventProcessor -> {
                    // shutdown this streaming processor
                    trackingEventProcessor.shutDown();
                    
                    // reset the tokens to prepare the processor with start index for replay
                   trackingEventProcessor.resetTokens(GapAwareTrackingToken.newInstance(restartIndex - 1, Collections.emptySortedSet()));
                    
                    // start the processor to initiate the replay
                    trackingEventProcessor.start();
                });
    }

When i use the GapAwareTrackingToken then i get the exception:

[] - Resolved [java.lang.IllegalArgumentException: Incompatible token type provided.]

I see that there is also a GlobalSequenceTrackingToken i can use, but i don't see any documentatieon about when these can/should be used.


Solution

  • The main "challenge" when doing a partial reset, is that you need to be able to tell where to reset to. In Axon, the position in a stream is defined with a TrackingToken.

    The source that you read from will provide you with such a token with each event that it provides. However, when you're doing a reset, you probably didn't store the relevant token while you were consuming those events.

    You can also create tokens using any StreamableMessageSource. Generally, this is your Event Store, but if you read from other sources, it could be something else, too.

    The StreamableMessageSource provides 4 methods to create a token:

    • createHeadToken - the position at the most recent edge of the stream, where only new events will be read
    • createTailToken - the position at the very beginning of the stream, allowing you to replay all events.
    • createTokenAt(Instant) - the most recent position in the stream that will return all events created on or after the given Instant. Note that some events may still have a timestamp earlier than this timestamp, as event creation and event storage isn't guaranteed to be the same.
    • createTokenSince(Duration) - similar to createTokenAt, but accepting an amount of time to go back.

    So in your case, createTokenAt should do the trick.