I have a topology where spout reads data from Kafka and sends to bolt which in turn calls a REST API (A) and that calls another REST API (B). So far API B did not have throttling. Now they have implemented throttling (x number of max calls per clock minute).
We need to implement the throttling handler.
Option A
Initially we were thinking to do it in REST API (A) level and put a
Thread.sleep(x in millis)
once the call is throttled by REST API (B)
but that will hold all the REST (A) calls waiting for that long (which will vary between 1 sec to 59 seconds) and that may increase the load for new calls coming in.
Option B
REST API (A) sends response back to Bolt about being throttled. Bolt notifies the Spout with process failure to
Option A is straight forward to implement but not a good solution in my opinion.
Trying to figure out if Option B is feasible with topology.max.spout.pending however how to dynamically communicate to Storm to put a throttling in spout. Anyone please can you share some thoughts on this option.
Option C
REST API (B) throttles the call from REST (A) which will not handle the same but will send the 429 response code to the bolt. The bolt will re-queue the message to a error topic part of another storm topology. This message can have retry count as part of it and in case the same message gets throttled again we can re-queue again with ++retry count.
Updating the post as found a solution to make the option B feasible.
Option D
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
* where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
* <p/>
* By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
* previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
* @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
* @param maxDelay maximum amount of time waiting before retrying
*
*/
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
this.initialDelay = initialDelay;
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this.toStringImpl());
}
The steps will be as follows:
KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
collector.fail(tuple)
which will signal spout to process the tuple again, based in the retry configuration setup in step 1 and 2Your option D sounds fine, but in the interest of avoiding duplicates in calls to API A, I think you should consider separating your topology into two.
Have a topology that reads from your original Kafka topic (call it topic 1), calls REST API A, and writes whatever the output of the bolt is back to a Kafka topic (call it topic 2).
You then make a second topology whose only job is to read from topic 2, and call REST API B.
This will allow you to use option D while avoiding extra calls to API A when you are saturating API B. Your topologies will look like
Kafka 1 -> Bolt A -> REST API A -> Kafka 2 Kafka 2 -> Bolt B -> REST API B
If you want to make the solution a little more responsive to the throttling, you can use the topology.max.spout.pending
configuration in Storm to limit how many tuples can be in-flight at the same time. You could then make your bolt B buffer in-flight tuples until the throttling expires, at which point you can make it try sending the tuples again. You can use OutputCollector.resetTupleTimeout
to avoid the tuples timing out while Bolt B is waiting for the throttling to expire. You can use tick tuples to make Bolt B periodically wake up and check whether the throttling has expired.