My Spring BFF has a function designed to clean up sessions in Redis, periodically, every 2 minutes using the 'Scheduled' annotation.
One issue I've read is that if I put this in a Docker container and spin up multiple instances of the container, then this function will run in every container, which is not really necessary. Is there some kind of mechanism I can use to prevent this from happening. E.g. what is a distribution lock? Would that be useful?
Spring Redis Cache Session Clean Up function
/**
* For cleanup operations (i.e. removing expired session from a ZSet (Sorted Sets) in Redis)
* Spring's scheduling mechanism will automatically call the cleanup method according to the schedule
* defined by the @Scheduled annotation.
*/
@Component
@EnableScheduling
internal class SessionEvicter(
private val redisOperations: ReactiveRedisOperations<String, String>,
springSessionProperties: SpringSessionProperties,
) {
private val logger = LoggerFactory.getLogger(SessionEvicter::class.java)
private val redisKeyLocation = springSessionProperties.redis?.expiredSessionsNamespace
?: "spring:session:sessions:expirations"
data class CleanupContext(
val now: Instant,
val pastFiveDays: Instant,
val range: Range<Double>,
val limit: Limit
)
// run every 120 seconds
@Scheduled(fixedRate = 120, timeUnit = TimeUnit.SECONDS)
fun cleanup(): Mono<Void> {
return Mono.fromCallable {
val now = Instant.now()
val pastFiveDays = now.minus(Duration.ofDays(5))
val range = Range.closed(
pastFiveDays.toEpochMilli().toDouble(),
now.toEpochMilli().toDouble()
)
val limit = Limit.limit().count(500)
CleanupContext(now, pastFiveDays, range, limit)
}
.doOnNext { context ->
logger.info("Scheduled cleanup execution started at ${Instant.now()}.")
logger.info("Current time (now): ${context.now}")
logger.info("Time range start: ${Date(context.pastFiveDays.toEpochMilli())}")
logger.info("Time range end: ${Date(context.now.toEpochMilli())}")
logger.info("Limit count: ${context.limit.count}")
logger.info("Redis key location: $redisKeyLocation")
}
.flatMap { context ->
val zSetOps = redisOperations.opsForZSet()
zSetOps.reverseRangeByScore(redisKeyLocation, context.range, context.limit)
.collectList()
.flatMap { sessionIdsList ->
if (sessionIdsList.isNotEmpty()) {
logger.info("Found ${sessionIdsList.size} sessions to remove.")
zSetOps.remove(
redisKeyLocation,
*sessionIdsList.toTypedArray()
).doOnSubscribe { logger.info("Started removal of sessions") }
.doOnSuccess { logger.info("Successfully removed sessions") }
.doOnError { e -> logger.error("Error during removal: ${e.message}") }
} else {
logger.info("No sessions found to remove.")
Mono.empty()
}
}
}
.doOnSuccess {
logger.info("Scheduled session cleanup check completed at ${Instant.now()}.")
}
.doOnError { e ->
logger.error("Error during session cleanup check: ${e.message}")
}
.then()
.doOnTerminate {
logger.info("Cleanup process terminated at ${Instant.now()}.")
}
.subscribeOn(Schedulers.boundedElastic()) // to ensure proper threading
}
}
There are various options depending on your project & environment. One way to achieve this is to use Quartz scheduler https://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/introduction.html
OR
You can use Redis Distrbutable lock for the same purpose https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
OR
Use a leader election mechanism with tools like Kubernetes' leader election API, Zookeeper, or Consul, where one instance becomes the leader and runs the scheduled tasks.
Sample code - you can place your session evicter logic inside try block
@Component
@EnableScheduling
public class SessionEvicter {
private final ReactiveRedisOperations<String, String> redisOperations;
private static final String LOCK_KEY = "session-cleanup-lock";
private static final Duration LOCK_EXPIRY = Duration.ofSeconds(120); // lock expiry time
public SessionEvicter(ReactiveRedisOperations<String, String> redisOperations) {
this.redisOperations = redisOperations;
}
@Scheduled(fixedRate = 120000)
public void cleanup() {
String lockValue = UUID.randomUUID().toString();
redisOperations.opsForValue()
.setIfAbsent(LOCK_KEY, lockValue, LOCK_EXPIRY)
.flatMap(acquired -> {
if (Boolean.TRUE.equals(acquired)) {
// Lock acquired, perform the cleanup task
return performCleanup()
.doFinally(signalType -> releaseLock(lockValue));
} else {
// Lock not acquired, skip cleanup
return Mono.empty();
}
})
.subscribe();
}
private Mono<Void> performCleanup() {
// Your existing cleanup code
// ...
return Mono.empty();
}
private Mono<Boolean> releaseLock(String lockValue) {
// Release the lock only if it was acquired by this instance
return redisOperations.opsForValue()
.get(LOCK_KEY)
.flatMap(currentValue -> {
if (lockValue.equals(currentValue)) {
return redisOperations.delete(LOCK_KEY);
}
return Mono.just(false);
});
}
}