Search code examples
javamultithreadingscalathrottling

How to perform throttling based on user defined argument?


I am writing to an in-memory distributed database in the batch size of that is user-defined in multithreaded environment. But I want to limit the number of rows written to ex. 1000 rows/sec. The reason for this requirement is that my producer is writing too fast and consumer is running into leaf-memory error. Is there any standard practice to perform throttling while batch processing of the records.

dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
      val dbRecords = recordSet.map(m => (m, Events.transform(m)))
      dbRecords.map { record =>
        try {
          Events.setValues(eventInsert, record._2)
          eventInsert.addBatch
        } catch {
          case e: Exception =>
            logger.error(s"error adding batch: ${e.getMessage}")
            val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"event: $error_event")
        }
      }

      // Bulk Commit Records
      try {
        eventInsert.executeBatch
      } catch {
        case e: java.sql.BatchUpdateException =>
          val updates = e.getUpdateCounts
          logger.error(s"failed commit: ${updates.toString}")
          updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
            val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"insert error: $error")
            logger.error(e.getMessage)
          }
      }
      finally {
        connection.commit
        eventInsert.clearBatch
        logger.debug(s"committed: ${dbRecords.length.toString}")
      }
    }

I was hoping if I could pass a user defined arguments as a throttleMax and if total records written by each thread reaches the throttleMax, thread.sleep() will be called for 1 sec. But this is going to make the entire process very slow. Can there be any other effective method, that can be used for throttle the loading of the data to 1000 rows/sec?


Solution

  • As others have suggested (see the comments on the question), you have better options available to you than throttling here. However, you can throttle an operation in Java with some simple code like the following:

    /**
     * Given an Iterator `inner`, returns a new Iterator which will emit items upon
     * request, but throttled to at most one item every `minDelayMs` milliseconds.
     */
    public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
        return new Iterator<T>() {
            private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;
    
            @Override
            public boolean hasNext() {
                return inner.hasNext();
            }
    
            @Override
            public T next() {
                long now = System.currentTimeMillis();
                long requiredDelayMs = now - lastEmittedMillis;
                if (requiredDelayMs > 0) {
                    try {
                        Thread.sleep(requiredDelayMs);
                    } catch (InterruptedException e) {
                        // resume
                    }
                }
                lastEmittedMillis = now;
    
                return inner.next();
            }
        };
    }
    

    The above code uses Thread.sleep, so is not suitable for use in a Reactive system. In that case, you would want to use the Throttle implementation provided in that system, e.g. throttle in Akka