Search code examples
aeron

Aeron cluster communication and recovery


I have been following Aeron BasicAuctionClusterClient and BasicAuctionClusteredServiceNode samples. I am currenctly running single cluster node with custom clustered service that reads PING messages and sends back PONG responses via client session.

Cluster setup

// clustered service
@Override
public void onSessionMessage(ClientSession session, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
    LOGGER.info("Session message [session={},timestamp={},initialTermId={}]", session, timestamp, header.initialTermId());

    headerDecoder.wrap(buffer, offset);
    int blockLength = headerDecoder.blockLength();
    int templateId = headerDecoder.templateId();
    int version = headerDecoder.version();
    int bufferOffset = HeaderDecoder.ENCODED_LENGTH;

    switch (templateId) {
        case PingDecoder.TEMPLATE_ID:
            pingDecoder.wrap(buffer, bufferOffset, blockLength, version);

            LOGGER.info("Ping");

            long correlationId = pingDecoder.correlationId();

            MutableDirectBuffer directBuffer = new ExpandableArrayBuffer();
            pongEncoder.wrapAndApplyHeader(directBuffer, 0, headerEncoder)
                .correlationId(correlationId);

            int encodedLength = headerEncoder.encodedLength() + pongEncoder.encodedLength();

            IdleStrategy idleStrategy = cluster.idleStrategy();
            idleStrategy.reset();

            LOGGER.info("Sending response [encodedLength={}]", encodedLength);

            while (session.offer(directBuffer, 0, encodedLength) < 0) {
                idleStrategy.idle();
            }

            LOGGER.info("Response sent");
        break;
        default:
            throw new UnsupportedOperationException();
    }
}

// cluster node
MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
    .aeronDirectoryName(aeronDirName)
    .threadingMode(ThreadingMode.SHARED)
    .errorHandler(e -> System.err.println("Error " + e))
    .multicastFlowControlSupplier(new MinMulticastFlowControlSupplier())
    .terminationHook(barrier::signal);

AeronArchive.Context replicationArchiveContext = new AeronArchive.Context()
    .controlResponseChannel("aeron:udp?endpoint=" + hostName + ":0");

Archive.Context archiveContext = new Archive.Context()
    .aeronDirectoryName(aeronDirName)
    .archiveDir(new File(baseDir, "archive"))
    .controlChannel(udpChannel(nodeId, hostName, ARCHIVE_CONTROL_PORT_OFFSET))
    .archiveClientContext(replicationArchiveContext)
    .localControlChannel("aeron:ipc?term-length=64k")
    .recordingEventsEnabled(false)
    .threadingMode(ArchiveThreadingMode.SHARED)
    .replicationChannel("aeron:udp?endpoint=" + hostName + ":0");

AeronArchive.Context aeronArchiveContext = new AeronArchive.Context()
    .aeronDirectoryName(aeronDirName)
    .lock(NoOpLock.INSTANCE)
    .controlRequestChannel(archiveContext.localControlChannel())
    .controlResponseChannel(archiveContext.localControlChannel());

ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context()
    .errorHandler(e -> System.err.println("Consensus module error: " + e))
    .clusterMemberId(nodeId)
    .clusterMembers(clusterMembers(Arrays.asList(hostName)))
    .clusterDir(new File(baseDir, "cluster"))
    .ingressChannel("aeron:ipc?term-length=64k")
    .logChannel(logControlChannel(nodeId, hostName, LOG_CONTROL_PORT_OFFSET))
    .replicationChannel(logReplicationChannel(hostName))
    .archiveContext(aeronArchiveContext.clone());

ClusteredServiceContainer.Context clusteredServiceContext = new ClusteredServiceContainer.Context()
    .aeronDirectoryName(aeronDirName)
    .archiveContext(aeronArchiveContext.clone())
    .clusterDir(new File(baseDir, "cluster"))
    .clusteredService(clusteredService)
    .errorHandler(e -> System.err.println("Clustered service error: " + e));

try (ClusteredMediaDriver clusteredMediaDriver = ClusteredMediaDriver.launch(mediaDriverContext, archiveContext, consensusModuleContext);
     ClusteredServiceContainer clusteredServiceCOntainer = ClusteredServiceContainer.launch(clusteredServiceContext)) {
    System.out.println("[" + nodeId + "] Started Cluster Node on " + hostName + "...");
    barrier.await();
    System.out.println("[" + nodeId + "] Exiting");
}

Client setup

class ClientListener implements EgressListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClientListener.class);

    @Override
    public void onMessage(long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
        LOGGER.info("Message [clusterSessionId={},timestamp={}]", clusterSessionId, timestamp);
    }

    @Override
    public void onSessionEvent(long correlationId, long clusterSessionId, long leadershipTermId, int leaderMemberId, EventCode code, String detail) {
        LOGGER.info("Session event [correlationId={},clusterSessionId={}]", correlationId, clusterSessionId);
    }

    @Override
    public void onNewLeader(long clusterSessionId, long leadershipTermId, int leaderMemberId, String ingressEndpoints) {
        LOGGER.info("New event [clusterSessionId={},leadershipTermId={}]", clusterSessionId, leadershipTermId);
    }

    @Override
    public void onAdminResponse(long clusterSessionId, long correlationId, AdminRequestType requestType, AdminResponseCode responseCode, String message, DirectBuffer payload, int payloadOffset, int payloadLength) {
        LOGGER.info("Admin response [clusterSessionId={},correlationId={}]", clusterSessionId, correlationId);
    }
}

MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
    .threadingMode(ThreadingMode.SHARED)
    .dirDeleteOnStart(true)
    .dirDeleteOnShutdown(true);

MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverContext);

AeronCluster.Context aeronClusterContext = new AeronCluster.Context()
    .aeronDirectoryName("abc")
    .egressListener(new ClientListener())
    .egressChannel("aeron:udp?endpoint=localhost:0")
    .aeronDirectoryName(mediaDriver.aeronDirectoryName())
    .ingressChannel("aeron:udp")
    .ingressEndpoints(ingressEndpoints(Arrays.asList("localhost")))
    .errorHandler(t -> LoggerFactory.getLogger("sdfsdf").error("sdfsdf", t));

AeronCluster clusterConnection = AeronCluster.connect(aeronClusterContext);
MutableDirectBuffer buffer = new ExpandableArrayBuffer();

HeaderEncoder headerEncoder = new HeaderEncoder();

PingEncoder pingEncoder = new PingEncoder();    
pingEncoder.wrapAndApplyHeader(buffer, 0, headerEncoder)
    .correlationId(5);

int encodedLength = headerEncoder.encodedLength() + pingEncoder.encodedLength();

IdleStrategy idleStrategy = new YieldingIdleStrategy(); 
LOGGER.info("Sending PING [encodedLength={}]", encodedLength);

while (clusterConnection.offer(buffer, 0, encodedLength) < 0) {
    idleStrategy.idle();
}

LOGGER.info("PING sent");

while (true) {
    Thread.sleep(1_000);
}

Now the questions.

  1. When the client sends PING message to the cluster I can see the cluster node is receiving it. It then tries to send a PONG response and it seems like it succeds, but the client listener doesn't see anything. How do I even go about investigating/debugging it?
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session message [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468126017,initialTermId=0]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Ping
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Sending response [encodedLength=16]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Response sent
  1. When the client is down and the cluster node is restarted it seems to replay all session messages.

    2a) Why is this happening? I thought that snapshot recovery (io.aeron.cluster.service.ClusteredService#onStart`)on start should prevent those.

    2b) How do I distinguish these replays from the actual "running" client messages. I only want to send the response PONG to currently running clients.

2023-11-20 09:51:59.638 [clustered-service] INFO  ClusteredService - Start [cluster=io.aeron.cluster.service.ClusteredServiceAgent@6f6c6c9a,snapshotImage=null]
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session open [session=ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50409', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700420302079]
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session close [session=ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50409', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700420312088,closeReason=TIMEOUT]
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session open [session=ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50070', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700464176130]
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session close [session=ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50070', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700464221815,closeReason=TIMEOUT]
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session open [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468126002]
2023-11-20 09:51:59.682 [clustered-service] INFO  ClusteredService - Session message [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468126017,initialTermId=0]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Ping
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Sending response [encodedLength=16]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Response sent
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Session close [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468136027,closeReason=TIMEOUT]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Session open [session=ClientSession{id=4, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:56645', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468290044]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Session message [session=ClientSession{id=4, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:56645', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468290054,initialTermId=0]
2023-11-20 09:51:59.683 [clustered-service] INFO  ClusteredService - Ping

I tried following the samples and documentation, but still this behaviour is not clear for me.


Solution

  • You don't need to worry about sending messages back to the client in io.aeron.samples.cluster.tutorial.BasicAuctionClusteredService#onSessionMessage

    When sending messages back to the client, the message is only send when the current node is a leader. This means that followers will not send anything. Also a node can only be elected a leader after the log has been rebuilt.

    https://github.com/real-logic/aeron/blob/3eea9411abd32b24128ec2e4ce1b09b7f426ab7f/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java#L611