I am trying to find a way to log the offset when an exception occurs.
Here is what I am trying to achieve:
void createTopology(StreamsBuilder builder) {
builder.stream(topic, Consumed.with(Serdes.String(), new JsonSerde()))
.filter(...)
.mapValues(value -> {
Map<String, Object> output;
try {
output = decode(value.get("data"));
} catch (DecodingException e) {
LOGGER.error(e.getMessage());
// TODO: LOG OFFSET FOR FAILED DECODE HERE
return new ArrayList<>();
}
...
return output;
})
.filter((k, v) -> !(v instanceof List && ((List<?>) v).isEmpty()))
.to(sink_topic);
}
I found this: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-transformations-stateful and it is in my understanding that I need to use the Processor API but still haven't found a solution for my issue.
A ValueTransfomer
can also access the offset via the ProcessorContext
passed via init
, and I believe it's much easier.