Search code examples
apache-kafkaapache-kafka-streams

Mixing Kafka Streams DSL with Processor API to get offset


I am trying to find a way to log the offset when an exception occurs.

Here is what I am trying to achieve:

void createTopology(StreamsBuilder builder) {
 builder.stream(topic, Consumed.with(Serdes.String(), new JsonSerde()))
        .filter(...)
        .mapValues(value -> {
          Map<String, Object> output;
          try {
            output = decode(value.get("data"));
          } catch (DecodingException e) {
            LOGGER.error(e.getMessage());
            // TODO: LOG OFFSET FOR FAILED DECODE HERE
            return new ArrayList<>();
          }
          ...
          return output;
        })
        .filter((k, v) -> !(v instanceof List && ((List<?>) v).isEmpty()))
        .to(sink_topic);
}

I found this: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-transformations-stateful and it is in my understanding that I need to use the Processor API but still haven't found a solution for my issue.


Solution

  • A ValueTransfomer can also access the offset via the ProcessorContext passed via init, and I believe it's much easier.