Search code examples
javaaeron

Aeron Archive - extend existing recording


I have an Aeron Archive and want to extend the existing recording, i.e. that it continuse to append messages after the service restart.

I wasn't able to find any actual example how to do that, so I came with the my own code based on Aeron javadoc/Aeron Cookbook.

What I have so far

I'm trying to get the lastRecordingId, the position etc. from the archive itself first.

private void findLatestRecording() {
        final RecordingDescriptorConsumer consumer =
                (controlSessionId, correlationId, recordingId,
                        startTimestamp, stopTimestamp, startPosition,
                        stopPosition, initialTermId, segmentFileLength,
                        termBufferLength, mtuLength, sessionId,
                        streamId, strippedChannel, originalChannel,
                        sourceIdentity) ->  {
                    AeronArchiveJournal.this.lastRecordingId = recordingId;
                    AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
                    AeronArchiveJournal.this.initialTermId = initialTermId;
                    AeronArchiveJournal.this.termBufferLength = termBufferLength;
                };

        final long fromRecordingId = 0L;
        final int recordCount = 100;
        final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);

        if (foundCount == 0) {
            LOG.info("No previous recording found, will start a new one");
        }
    }

then I'm trying to extend the recording

 private void extendExistingRecording() {
        publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);

        String channelUri = new ChannelUriStringBuilder()
                .media(CommonContext.IPC_MEDIA)
                .initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
                .build();

        LOG.info("Extending existing recording");
        LOG.info("Recording id: {}", lastRecordingId);
        LOG.info("Channel URI: {}", channelUri);


        archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);

        LOG.info("Waiting for recording to start for session {}", publication.sessionId());
        final CountersReader counters = aeron.countersReader();
        int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());

        while (CountersReader.NULL_COUNTER_ID == counterId) {
            journalIdleStrategy.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
        }
        lastRecordingId = RecordingPos.getRecordingId(counters, counterId);

    }

However the CountersReader loop never finishes and aeron prints following warning:

io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312

Clearly I'm missing something, but so far I'm not able to figure out what.


Solution

  • To extend an existing recording it is necessary to set the initial position of the publication you will use to extend the existing recording. The stream needs to be initialised so it is a genuine extension. This tests shows how it is done.

    https://github.com/real-logic/aeron/blob/master/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java