I want to build a simple Kafka stream that tries to transform events based on some conditions. If the event can be transformed, the transformed event goes into a different topic. If the event cannot be transformed, it is stored again in the same topic for a future try.
Let's say I have this:
case class Foo(a: String, b: String, c: Boolean)
def translate(value: String): Option[Foo] = {
// ...
// Returns an Option of Foo
}
So I would need to have something like this:
val builder: StreamsBuilder = new StreamsBuilder()
builder
.stream(topic)
.map[String, String]((key, value) => translate(value))
// If translate(value) is Some(value) send the value to a topic
// Otherwise, send the original value (without being transformed) to the same topic
I'm totally stuck with this issue. The nearest thing I've come across is try to create a structure with a boolean that tells me if the event can be transformed or not, and then create different streams with a .branch
. For instance, something like this:
def translate(value: String): (Boolean, Option[CPCTTMDataTransformed]) = {
val eventTransformed = transform(value)
eventTransformed match {
case Some(value) => (true, Option(value))
case None => (false, None)
}
}
And then try to do something like this:
builder
.stream(topic)
.map[String, (Boolean, Option[Foo])]((key, value) => translate(value))
.branch(
(_, element) => element._1,
)
.foreach {
// Send the "true" to one topic and in the "false", send the original message to the original topic
}
But of course I would need to have the original event to send it to the topic.
I've though about more complicated structures, but in the end I always come back to the problem of branching the stream based on a Some
-None
condition.
Maybe use the Processor API. You have one Processor
that does the translation, and if the translations is successful you context.forward(To.child("translated"))
otherwise you context.forward(To.child("retry"))
.
You plug your Topology
together manually:
Topology topology = new Topology();
topology.addSource("source", topic);
topology.addProcessor("translator", () -> new TranslateProcessor(), "source");
topology.addSink("translated", resultTopic, "translator");
topology.addSink("retry", topic, "translator");