Search code examples
multithreadingscalaperformancethread-synchronizationsinglestore

How do I limit write operations to 1k records/sec?


Currently, I am able to write to database in the batchsize of 500. But due to the memory shortage error and delay synchronization between child aggregator and leaf node of database, sometimes I am running into Leaf Node Memory Error. The only solution for this is if I limit my write operations to 1k records per second, I can get rid of the error.

dataStream
  .map(line => readJsonFromString(line))
  .grouped(memsqlBatchSize)
  .foreach { recordSet =>
          val dbRecords = recordSet.map(m => (m, Events.transform(m)))
          dbRecords.map { record =>
            try {
              Events.setValues(eventInsert, record._2)
              eventInsert.addBatch
            } catch {
              case e: Exception =>
                logger.error(s"error adding batch: ${e.getMessage}")
                val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
                logger.error(s"event: $error_event")
            }
          }

          // Bulk Commit Records
          try {
            eventInsert.executeBatch
          } catch {
            case e: java.sql.BatchUpdateException =>
              val updates = e.getUpdateCounts
              logger.error(s"failed commit: ${updates.toString}")
              updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
                val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
                logger.error(s"insert error: $error")
                logger.error(e.getMessage)
              }
          }
          finally {
            connection.commit
            eventInsert.clearBatch
            logger.debug(s"committed: ${dbRecords.length.toString}")
          }
        }

The reason for 1k records is that, some of the data that I am trying to write can contains tons of json records and if batch size if 500, that may result in 30k records per second. Is there any way so that I can make sure that only 1000 records will be written to the database in a batch irrespective of the number of records?


Solution

  • I don't think Thead.sleep is a good idea to handle this situation. Generally we don't recommend to do so in Scala and we don't want to block the thread in any case.

    One suggestion would be using any Streaming techniques such as Akka.Stream, Monix.Observable. There are some pro and cons between those libraries I don't want to spend too much paragraph on it. But they do support back pressure to control the producing rate when consumer is slower than producer. For example, in your case your consumer is database writing and your producer maybe is reading some json files and doing some aggregations.

    The following code illustrates the idea and you will need to modify as your need:

    val sourceJson = Source(dataStream.map(line => readJsonFromString(line)))
    val sinkDB = Sink(Events.jm.writeValueAsString) // you will need to figure out how to generate the Sink
    val flowThrottle = Flow[String]
      .throttle(1, 1.second, 1, ThrottleMode.shaping)
    
    val runnable = sourceJson.via[flowThrottle].toMat(sinkDB)(Keep.right)
    val result = runnable.run()