Search code examples
kotlinapache-flinkflink-streaming

Limit events flowing in a Flink job without without using Thread.sleep


I'm new to Flink, and I'm attempting to implement a pipeline that consumes from a Kafka topic, performs minor filtering and transformations on this data, and asynchronously writes to an endpoint.

The challenge I'm facing is that this endpoint has a limit of receiving 10,000 calls per minute, and I'm struggling to throttle my rates without using Thread.sleep (which is not considered good practice in Flink).

Currently, in a high legal my code looks like this:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val kafkaSource = KafkaSource.builder<JsonNode>()
        .setBootstrapServers(...)
        .setTopics(...)
        .setStartingOffsets(OffsetsInitializer.latest())
        .setDeserializer(..)
        .build()

val stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Events Topic", JsonNodeTypeInfo())

val filteredStream = stream.filter(FilterFunction())

val transformedStream = filteredStream.map(TransformFunction())

val rateLimitedStream = transformedStream.process(RateFunction()).setParallelism(1)

val httpSink = HttpAsyncSink()
transformedStream.sinkTo(httpSink)

env.execute("Flink Job")

So, the RateFunction() is the one I use to count the number of events to limit it to 10k and sleep the thread if this limit is reached within a minute.

class RateFunction(private val numEventsLimit: Int, private val timeLimitIntervalInSeconds: Int, private val threadSleepTimeInSeconds: Int) : ProcessFunction<JsonNode, JsonNode>() {

    // Counter to track the total number of processed events
    private lateinit var eventsCounter: Counter
    private var lastTime: Long = System.currentTimeMillis()

    override fun open(parameters: Configuration) {
        // Initializing Counters
        eventsCounter = runtimeContext.metricGroup.counter("totalEvents")
    }

    override fun processElement(value: JsonNode, ctx: ProcessFunction<JsonNode, JsonNode>.Context, out: Collector<JsonNode>) {

        // Function to reset counter
        fun resetCounter() {
            eventsCounter = runtimeContext.metricGroup.counter("totalEvents")
        }

        // Function to return the current millis
        fun currentMillis(): Long {
            return System.currentTimeMillis()
        }

        // If statement to manage the rate of the events being published
        if ((eventsCounter.count < numEventsLimit) && ((currentMillis() - lastTime) <= (timeLimitIntervalInSeconds * 1000))) {
            out.collect(value) // forward event
            eventsCounter.inc()
        } else if ((currentMillis() - lastTime) > (timeLimitIntervalInSeconds * 1000)) {
            lastTime = currentMillis()
            resetCounter()
            println("Resetting both eventsCounter and lastTime variables")
            out.collect(value) // forward event
            eventsCounter.inc()
        } else if (eventsCounter.count >= numEventsLimit) {
            println("Sleeping for $threadSleepTimeInSeconds seconds due reached Events Limitation")
            Thread.sleep((threadSleepTimeInSeconds * 1000).toLong())
            lastTime = currentMillis()
            resetCounter()
            out.collect(value) // forward event
            eventsCounter.inc()
        } else {
            throw RuntimeException("Something wrong happened during the process of the element $value")
        }
    }
}

And thats my HttpAsyncSink() function:

class HttpAsyncSink(private val objectMapper: ObjectMapper): Sink<JsonNode> {
    override fun createWriter(context: InitContext): SinkWriter<JsonNode> {
        return HttpAsyncWriter(context, objectMapper)
    }
}

class HttpAsyncWriter(context: InitContext, private val objectMapper: ObjectMapper): AsyncSinkWriter<JsonNode, JsonNode>(
    HttpRequestJsonPassthrough(),
    context,
    AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder()
        .setMaxBatchSize(1)
        .setMaxBatchSizeInBytes(500_000_000)
        .setMaxInFlightRequests(100_000)
        .setMaxBufferedRequests(100_000)
        .setMaxTimeInBufferMS(1_000_000)
        .setMaxRecordSizeInBytes(1_000_000)
        .build(),
    mutableListOf()
) {
    private val client = OkHttpClient()

    override fun submitRequestEntries(
        requestEntries: MutableList<JsonNode>,
        requestToRetry: Consumer<MutableList<JsonNode>>
    ) {
        requestEntries.forEach {
            val endpoint = it.payload.get("endpointUrl").asText()
            val jsonString = objectMapper.writeValueAsString(it.payload)
            val jsonMediaType = "application/json; charset=utf-8".toMediaTypeOrNull()
            val body: RequestBody = jsonString.toRequestBody(jsonMediaType)
            val request: Request = Request.Builder()
                .post(body)
                .url(endpoint)
                .build()
            val call: Call = client.newCall(request)
            call.enqueue(object : Callback {
                    // In case of failure, the record will return to a Consumer/Buffer to be sent again in another moment
                    override fun onFailure(call: Call, e: IOException) {
                        println("Request failed for record: $it\n returning it to the buffer.. Reason: ${e.message}")
                        requestToRetry.accept(mutableListOf(it))
                    }
                    // In case of success, the Collections.emptyList() tells the consumer that the request was accepted
                    override fun onResponse(call: Call, response: Response) {
                        println("Response: ${Collections.singleton(response.body.string())}")
                        response.body.close()
                        requestToRetry.accept(Collections.emptyList())
                    }
                }
            )
        }
    }

    // It returns the size of the record in bytes
    override fun getSizeInBytes(requestEntry: JsonNode): Long {
        return requestEntry.size
    }

}

I've attempted to use the allWindow function with a tumbling window of 1 minute and create a custom trigger based on the number of events, but I haven't had much success. I also tried using a CountWindow, but encountered difficulties in implementing a system that would simply "pause" and wait for the next minute to send new events if the 10,000 events limit is reached.

Any insights or suggestions on how to implement rate limiting in Flink without relying on Thread.sleep would be greatly appreciated.

Thank you!


UPDATE:

Based on the David Anderson answer, I've implemented this process function using the GuavaFlinkConnectorRateLimiter.

val limitedStream = transformedStream
    .process(object : ProcessFunction<JsonNode, JsonNode>() {
        val limiter = GuavaFlinkConnectorRateLimiter()

        override fun open(parameters: Configuration?) {
            limiter.rate = 1500 // bytes per second
            limiter.open(runtimeContext)
        }

        override fun processElement(value: JsonNode, ctx: Context, out: Collector<JsonNode>) {
            limiter.acquire(1)
            out.collect(value)
        }
    }
)

Maybe it isn't the best solution, but it seems to be working fine!

I've validated it using a 10sec tumbling window to print the input.count (Screenshot)


Solution

  • It's true that in Flink you should never sleep in the main processing thread.

    You can, however, safely apply rate limiting in a deserializer, as Flink runs deserialization schemas in another thread. I suggest using a Guava RateLimiter, rather than sleep.