I have been following Aeron BasicAuctionClusterClient
and BasicAuctionClusteredServiceNode
samples.
I am currenctly running single cluster node with custom clustered service that reads PING messages and sends back PONG responses via client session.
Cluster setup
// clustered service
@Override
public void onSessionMessage(ClientSession session, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
LOGGER.info("Session message [session={},timestamp={},initialTermId={}]", session, timestamp, header.initialTermId());
headerDecoder.wrap(buffer, offset);
int blockLength = headerDecoder.blockLength();
int templateId = headerDecoder.templateId();
int version = headerDecoder.version();
int bufferOffset = HeaderDecoder.ENCODED_LENGTH;
switch (templateId) {
case PingDecoder.TEMPLATE_ID:
pingDecoder.wrap(buffer, bufferOffset, blockLength, version);
LOGGER.info("Ping");
long correlationId = pingDecoder.correlationId();
MutableDirectBuffer directBuffer = new ExpandableArrayBuffer();
pongEncoder.wrapAndApplyHeader(directBuffer, 0, headerEncoder)
.correlationId(correlationId);
int encodedLength = headerEncoder.encodedLength() + pongEncoder.encodedLength();
IdleStrategy idleStrategy = cluster.idleStrategy();
idleStrategy.reset();
LOGGER.info("Sending response [encodedLength={}]", encodedLength);
while (session.offer(directBuffer, 0, encodedLength) < 0) {
idleStrategy.idle();
}
LOGGER.info("Response sent");
break;
default:
throw new UnsupportedOperationException();
}
}
// cluster node
MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.aeronDirectoryName(aeronDirName)
.threadingMode(ThreadingMode.SHARED)
.errorHandler(e -> System.err.println("Error " + e))
.multicastFlowControlSupplier(new MinMulticastFlowControlSupplier())
.terminationHook(barrier::signal);
AeronArchive.Context replicationArchiveContext = new AeronArchive.Context()
.controlResponseChannel("aeron:udp?endpoint=" + hostName + ":0");
Archive.Context archiveContext = new Archive.Context()
.aeronDirectoryName(aeronDirName)
.archiveDir(new File(baseDir, "archive"))
.controlChannel(udpChannel(nodeId, hostName, ARCHIVE_CONTROL_PORT_OFFSET))
.archiveClientContext(replicationArchiveContext)
.localControlChannel("aeron:ipc?term-length=64k")
.recordingEventsEnabled(false)
.threadingMode(ArchiveThreadingMode.SHARED)
.replicationChannel("aeron:udp?endpoint=" + hostName + ":0");
AeronArchive.Context aeronArchiveContext = new AeronArchive.Context()
.aeronDirectoryName(aeronDirName)
.lock(NoOpLock.INSTANCE)
.controlRequestChannel(archiveContext.localControlChannel())
.controlResponseChannel(archiveContext.localControlChannel());
ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context()
.errorHandler(e -> System.err.println("Consensus module error: " + e))
.clusterMemberId(nodeId)
.clusterMembers(clusterMembers(Arrays.asList(hostName)))
.clusterDir(new File(baseDir, "cluster"))
.ingressChannel("aeron:ipc?term-length=64k")
.logChannel(logControlChannel(nodeId, hostName, LOG_CONTROL_PORT_OFFSET))
.replicationChannel(logReplicationChannel(hostName))
.archiveContext(aeronArchiveContext.clone());
ClusteredServiceContainer.Context clusteredServiceContext = new ClusteredServiceContainer.Context()
.aeronDirectoryName(aeronDirName)
.archiveContext(aeronArchiveContext.clone())
.clusterDir(new File(baseDir, "cluster"))
.clusteredService(clusteredService)
.errorHandler(e -> System.err.println("Clustered service error: " + e));
try (ClusteredMediaDriver clusteredMediaDriver = ClusteredMediaDriver.launch(mediaDriverContext, archiveContext, consensusModuleContext);
ClusteredServiceContainer clusteredServiceCOntainer = ClusteredServiceContainer.launch(clusteredServiceContext)) {
System.out.println("[" + nodeId + "] Started Cluster Node on " + hostName + "...");
barrier.await();
System.out.println("[" + nodeId + "] Exiting");
}
Client setup
class ClientListener implements EgressListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientListener.class);
@Override
public void onMessage(long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
LOGGER.info("Message [clusterSessionId={},timestamp={}]", clusterSessionId, timestamp);
}
@Override
public void onSessionEvent(long correlationId, long clusterSessionId, long leadershipTermId, int leaderMemberId, EventCode code, String detail) {
LOGGER.info("Session event [correlationId={},clusterSessionId={}]", correlationId, clusterSessionId);
}
@Override
public void onNewLeader(long clusterSessionId, long leadershipTermId, int leaderMemberId, String ingressEndpoints) {
LOGGER.info("New event [clusterSessionId={},leadershipTermId={}]", clusterSessionId, leadershipTermId);
}
@Override
public void onAdminResponse(long clusterSessionId, long correlationId, AdminRequestType requestType, AdminResponseCode responseCode, String message, DirectBuffer payload, int payloadOffset, int payloadLength) {
LOGGER.info("Admin response [clusterSessionId={},correlationId={}]", clusterSessionId, correlationId);
}
}
MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.threadingMode(ThreadingMode.SHARED)
.dirDeleteOnStart(true)
.dirDeleteOnShutdown(true);
MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverContext);
AeronCluster.Context aeronClusterContext = new AeronCluster.Context()
.aeronDirectoryName("abc")
.egressListener(new ClientListener())
.egressChannel("aeron:udp?endpoint=localhost:0")
.aeronDirectoryName(mediaDriver.aeronDirectoryName())
.ingressChannel("aeron:udp")
.ingressEndpoints(ingressEndpoints(Arrays.asList("localhost")))
.errorHandler(t -> LoggerFactory.getLogger("sdfsdf").error("sdfsdf", t));
AeronCluster clusterConnection = AeronCluster.connect(aeronClusterContext);
MutableDirectBuffer buffer = new ExpandableArrayBuffer();
HeaderEncoder headerEncoder = new HeaderEncoder();
PingEncoder pingEncoder = new PingEncoder();
pingEncoder.wrapAndApplyHeader(buffer, 0, headerEncoder)
.correlationId(5);
int encodedLength = headerEncoder.encodedLength() + pingEncoder.encodedLength();
IdleStrategy idleStrategy = new YieldingIdleStrategy();
LOGGER.info("Sending PING [encodedLength={}]", encodedLength);
while (clusterConnection.offer(buffer, 0, encodedLength) < 0) {
idleStrategy.idle();
}
LOGGER.info("PING sent");
while (true) {
Thread.sleep(1_000);
}
Now the questions.
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session message [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468126017,initialTermId=0]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Ping
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Sending response [encodedLength=16]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Response sent
When the client is down and the cluster node is restarted it seems to replay all session messages.
2a) Why is this happening? I thought that snapshot recovery (io.aeron.cluster.service.ClusteredService#onStart`)
on start should prevent those.
2b) How do I distinguish these replays from the actual "running" client messages. I only want to send the response PONG to currently running clients.
2023-11-20 09:51:59.638 [clustered-service] INFO ClusteredService - Start [cluster=io.aeron.cluster.service.ClusteredServiceAgent@6f6c6c9a,snapshotImage=null]
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session open [session=ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50409', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700420302079]
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session close [session=ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50409', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700420312088,closeReason=TIMEOUT]
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session open [session=ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50070', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700464176130]
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session close [session=ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:50070', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700464221815,closeReason=TIMEOUT]
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session open [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468126002]
2023-11-20 09:51:59.682 [clustered-service] INFO ClusteredService - Session message [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468126017,initialTermId=0]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Ping
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Sending response [encodedLength=16]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Response sent
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Session close [session=ClientSession{id=3, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:54738', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468136027,closeReason=TIMEOUT]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Session open [session=ClientSession{id=4, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:56645', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468290044]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Session message [session=ClientSession{id=4, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:56645', encodedPrincipal=[], responsePublication=null, isClosing=false},timestamp=1700468290054,initialTermId=0]
2023-11-20 09:51:59.683 [clustered-service] INFO ClusteredService - Ping
I tried following the samples and documentation, but still this behaviour is not clear for me.
You don't need to worry about sending messages back to the client in io.aeron.samples.cluster.tutorial.BasicAuctionClusteredService#onSessionMessage
When sending messages back to the client, the message is only send when the current node is a leader. This means that followers will not send anything. Also a node can only be elected a leader after the log has been rebuilt.