Search code examples
axonaxon-framework

Initial token position in axon kafka extension


I'm using Axon kafka extension (4.5.4) to receive events from multiple sources (MultiStreamableMessageSource). How can I set initial token position (head) for StreamableMessageSource?

With this configuration

val config = TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
        .andInitialTrackingToken { it.createHeadToken() }

i get UnsupportedOperationException.

As far as I understand, I need to write a custom implementation of creating a KafkaTrackingToken in the andInitialTrackingToken lambda. Is there an example? What if I don't know the specific partition offsets? Also, I have multiple MessageSource and it would look ugly.

PS. I know I could use SubscribableMessageSource, but that doesn't work for me, because I need to combine several MessageSources into one (like MultiStreamableMessageSource does)


Solution

  • It's not supported, not even with the latest, 4.7.0 version. In theory it should be possible, at least head, tail, and by timestamp. The issue to support those is now merged. So this will be supported from the 4.8.0 release, which should happen in a few months.

    Currently, it will always default to offset 0, if there is no known offset. It's the ConsumerSeekUtil that does this. With the pr, other tokens should be supported, which can move to a timestamp, or the end, when no offset is yet there.