Search code examples
scalaapache-kafkaapache-kafka-streams

How to transform a Kafka Stream event and send it to another topic only if could be transformed


I want to build a simple Kafka stream that tries to transform events based on some conditions. If the event can be transformed, the transformed event goes into a different topic. If the event cannot be transformed, it is stored again in the same topic for a future try.

Let's say I have this:

case class Foo(a: String, b: String, c: Boolean)

def translate(value: String): Option[Foo] = {
  // ...
  // Returns an Option of Foo
}

So I would need to have something like this:

val builder: StreamsBuilder = new StreamsBuilder()

builder
  .stream(topic)
  .map[String, String]((key, value) => translate(value))
  // If translate(value) is Some(value) send the value to a topic
  // Otherwise, send the original value (without being transformed) to the same topic

I'm totally stuck with this issue. The nearest thing I've come across is try to create a structure with a boolean that tells me if the event can be transformed or not, and then create different streams with a .branch. For instance, something like this:

def translate(value: String): (Boolean, Option[CPCTTMDataTransformed]) = {
  val eventTransformed = transform(value)
  eventTransformed match {
    case Some(value) => (true, Option(value))
    case None => (false, None)
  }
}

And then try to do something like this:

builder
  .stream(topic)
  .map[String, (Boolean, Option[Foo])]((key, value) => translate(value))
  .branch(
    (_, element) => element._1,
  )
  .foreach {
    // Send the "true" to one topic and in the "false", send the original message to the original topic 
  }

But of course I would need to have the original event to send it to the topic.

I've though about more complicated structures, but in the end I always come back to the problem of branching the stream based on a Some-None condition.


Solution

  • Maybe use the Processor API. You have one Processor that does the translation, and if the translations is successful you context.forward(To.child("translated")) otherwise you context.forward(To.child("retry")).

    You plug your Topology together manually:

    Topology topology = new Topology();
    topology.addSource("source", topic);
    topology.addProcessor("translator", () -> new TranslateProcessor(), "source");
    topology.addSink("translated", resultTopic, "translator");
    topology.addSink("retry", topic, "translator");