I used in my aerospike
recovery process the following truncate
implementation which gave me great visibility about the number of records which affected during the operation:
def truncate(startTime: Long, durableDelete: Boolean): Iterable[Int] = {
// Setting LUT
val calendar = Calendar.getInstance()
logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Scan and Write Policies
val writePolicy = new WritePolicy()
val scanPolicy = new ScanPolicy()
writePolicy.durableDelete = durableDelete
scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))
// Scan all records such as LUT <= startTime
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
val recordCount = new AtomicInteger(0)
client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
override def scanCallback(key: Key, record: Record): Unit = {
val requiresNullify = bins.filter(record.bins.containsKey(_)).toSeq // Instead of making bulk requests which maybe not be needed and load AS
if (requiresNullify.nonEmpty) {
recordCount.incrementAndGet()
client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
logger.debug {
val (nullified, remains) = record.bins.asScala.partition { case (key, _) => requiresNullify.contains(key) }
s"(#$recordCount): Record $nullified bins of record with userKey: ${key.userKey}, digest: ${Buffer.bytesToHexString(key.digest)} nullified, remains: $remains"
}
}
}
})
The problem is that the operation took a lot of time because of the call backs and could not been affected on production
enviornment, I changed the implementation to the following which instead of taking about 2
hours, the time reduced to 10
minutes.
def truncate(startTime: Long, durableDelete: Boolean): Unit = {
// Setting LUT
val calendar = Calendar.getInstance()
logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Write Policy
val writePolicy = new WritePolicy()
writePolicy.durableDelete = durableDelete
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
// Filter all elements s.t lastUpdate <= startTime on $set
writePolicy.filterExp = Exp.build(
Exp.and(
Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
Exp.eq(Exp.setName(), Exp.`val`(set)))
)
val statement = new Statement
statement.setNamespace(namespace)
val toNullify = bins.map(Bin.asNull).map(Operation.put).toList
client.execute(writePolicy, statement, toNullify: _*).waitTillComplete(10.seconds.toMillis.toInt, 1.hour.toMillis.toInt)
}
}
}
But the problem is that I don't have any visibility about the affected records like the first method provided me (check out the logger.debug
)
Is there a solution how to run with good performance and also provide logs?
Thanks!
Looks like you are using enterprise and that your truncate is only taking LUT into account. The preferred method is to use the truncate
API. This has a significant advantage over the scan and durable delete method because it will not need to keep a tombstone entry per deleted key, instead, it will have a single entry marking all records in the set as deleted. It also doesn't need to invoke the "tomb raider," which is a periodic disk scan that is searching for tombstones that no longer mark dead records on the device (aka a "cenotaph"). The number of records deleted per node via truncate can be found as truncated_records.
You can invoke this truncate method with the truncate info command.
BTW, you may be able to significantly speed up the first method by settings the includeBinData
option to false
in the scan policy. This results in Aerospike only needing to read and send the in-memory metadata during the scan. I believe we still have to read the device if you've stored keys the record's key with the record.