Search code examples
aerospike

Performance on scanAll operation vs execute


I used in my aerospike recovery process the following truncate implementation which gave me great visibility about the number of records which affected during the operation:

 def truncate(startTime: Long, durableDelete: Boolean): Iterable[Int] = {

    // Setting LUT
    val calendar = Calendar.getInstance()

    logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")

    // Define Scan and Write Policies
    val writePolicy = new WritePolicy()
    val scanPolicy = new ScanPolicy()
    writePolicy.durableDelete = durableDelete
    scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))

    // Scan all records such as LUT <= startTime
    config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
      for ((set, bins) <- mapOfSetsToBins) yield {
        val recordCount = new AtomicInteger(0)
        client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
          override def scanCallback(key: Key, record: Record): Unit = {
            val requiresNullify = bins.filter(record.bins.containsKey(_)).toSeq // Instead of making bulk requests which maybe not be needed and load AS
            if (requiresNullify.nonEmpty) {
              recordCount.incrementAndGet()
              client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
              logger.debug {
                val (nullified, remains) = record.bins.asScala.partition { case (key, _) => requiresNullify.contains(key) }
                s"(#$recordCount): Record $nullified bins of record with userKey: ${key.userKey}, digest: ${Buffer.bytesToHexString(key.digest)} nullified, remains: $remains"
              }
            }
          }
        })

The problem is that the operation took a lot of time because of the call backs and could not been affected on production enviornment, I changed the implementation to the following which instead of taking about 2 hours, the time reduced to 10 minutes.

def truncate(startTime: Long, durableDelete: Boolean): Unit = {

    // Setting LUT
    val calendar = Calendar.getInstance()

    logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")

    // Define Write Policy
    val writePolicy = new WritePolicy()
    writePolicy.durableDelete = durableDelete

    config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
      for ((set, bins) <- mapOfSetsToBins) yield {

        // Filter all elements s.t lastUpdate <= startTime on $set
        writePolicy.filterExp = Exp.build(
          Exp.and(
            Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
            Exp.eq(Exp.setName(), Exp.`val`(set)))
        )

        val statement = new Statement
        statement.setNamespace(namespace)
        val toNullify = bins.map(Bin.asNull).map(Operation.put).toList
        client.execute(writePolicy, statement, toNullify: _*).waitTillComplete(10.seconds.toMillis.toInt, 1.hour.toMillis.toInt)
      }
    }
  }

But the problem is that I don't have any visibility about the affected records like the first method provided me (check out the logger.debug)

Is there a solution how to run with good performance and also provide logs?

Thanks!


Solution

  • Looks like you are using enterprise and that your truncate is only taking LUT into account. The preferred method is to use the truncate API. This has a significant advantage over the scan and durable delete method because it will not need to keep a tombstone entry per deleted key, instead, it will have a single entry marking all records in the set as deleted. It also doesn't need to invoke the "tomb raider," which is a periodic disk scan that is searching for tombstones that no longer mark dead records on the device (aka a "cenotaph"). The number of records deleted per node via truncate can be found as truncated_records.

    You can invoke this truncate method with the truncate info command.


    BTW, you may be able to significantly speed up the first method by settings the includeBinData option to false in the scan policy. This results in Aerospike only needing to read and send the in-memory metadata during the scan. I believe we still have to read the device if you've stored keys the record's key with the record.