For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.
The natural example is calling some external API that supports batching for efficiency.
How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.
So far I have:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
What I want is:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
In Scala and Akka Streams the function is called grouped
or batch
. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.
Doesn't seem to exist yet. Watch this space https://issues.apache.org/jira/browse/KAFKA-7432