Search code examples
timeoutapache-stormthrottling

Throttle Apache Spout Dynamically


I have a topology where spout reads data from Kafka and sends to bolt which in turn calls a REST API (A) and that calls another REST API (B). So far API B did not have throttling. Now they have implemented throttling (x number of max calls per clock minute).

We need to implement the throttling handler.

Option A

Initially we were thinking to do it in REST API (A) level and put a

Thread.sleep(x in millis) once the call is throttled by REST API (B)

but that will hold all the REST (A) calls waiting for that long (which will vary between 1 sec to 59 seconds) and that may increase the load for new calls coming in.

Option B

REST API (A) sends response back to Bolt about being throttled. Bolt notifies the Spout with process failure to

  • To not to change the offset for those messages
  • To tell spout to stop reading from Kafka and to stop emitting message to Bolt.
  • Spout waits for some time (say a minute) and resumes from where it left

Option A is straight forward to implement but not a good solution in my opinion.

Trying to figure out if Option B is feasible with topology.max.spout.pending however how to dynamically communicate to Storm to put a throttling in spout. Anyone please can you share some thoughts on this option.

Option C

REST API (B) throttles the call from REST (A) which will not handle the same but will send the 429 response code to the bolt. The bolt will re-queue the message to a error topic part of another storm topology. This message can have retry count as part of it and in case the same message gets throttled again we can re-queue again with ++retry count.

Updating the post as found a solution to make the option B feasible.

Option D

https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java

/**
 * The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
 * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
 * where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
 * <p/>
 * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
 * previous polled records in favor of processing more records.
 *
 * @param initialDelay      initial delay of the first retry
 * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
 * @param maxRetries        maximum number of times a tuple is retried before being acked and scheduled for commit
 * @param maxDelay          maximum amount of time waiting before retrying
 *
 */
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
    this.initialDelay = initialDelay;
    this.delayPeriod = delayPeriod;
    this.maxRetries = maxRetries;
    this.maxDelay = maxDelay;
    LOG.debug("Instantiated {}", this.toStringImpl());
}

The steps will be as follows:

  1. Create kafkaSpoutRetryService using the above constructor
  2. Set retry to KafkaSpoutConfig using KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
  3. Fail the Bolt in case there is throttling in Rest API (B) using collector.fail(tuple) which will signal spout to process the tuple again, based in the retry configuration setup in step 1 and 2

Solution

  • Your option D sounds fine, but in the interest of avoiding duplicates in calls to API A, I think you should consider separating your topology into two.

    Have a topology that reads from your original Kafka topic (call it topic 1), calls REST API A, and writes whatever the output of the bolt is back to a Kafka topic (call it topic 2).

    You then make a second topology whose only job is to read from topic 2, and call REST API B.

    This will allow you to use option D while avoiding extra calls to API A when you are saturating API B. Your topologies will look like

    Kafka 1 -> Bolt A -> REST API A -> Kafka 2 Kafka 2 -> Bolt B -> REST API B

    If you want to make the solution a little more responsive to the throttling, you can use the topology.max.spout.pending configuration in Storm to limit how many tuples can be in-flight at the same time. You could then make your bolt B buffer in-flight tuples until the throttling expires, at which point you can make it try sending the tuples again. You can use OutputCollector.resetTupleTimeout to avoid the tuples timing out while Bolt B is waiting for the throttling to expire. You can use tick tuples to make Bolt B periodically wake up and check whether the throttling has expired.