I'm using FluentD (v.12 last stable version) to send messages to Kafka. But FluentD is using an old KafkaProducer, so that the records timestamp is always set to -1. Thus i have to use the WallclockTimestampExtractor to set the timestamp of the record to the point in time, when the message arrives in kafka.
Is there a Kafka Streams-specific solution?
The timestamp i'm realy interested in, is send by fluentd within the message:
"timestamp":"1507885936","host":"V.X.Y.Z."
record representation in kafka:
offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
i would like to have a record like this in kafka:
offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
my workaround would look like:
write a consumer to extract the timestamp (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
write a producer to produce a new record with the timestamp set (ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
I would prefer a KafkaStreams solution, if there is one.
You can write a very simple Kafka Streams Application like:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
and configure the application with a custom TimestampExtractor
that extract the timestamp from the record and returns it.
Kafka Streams will use the returned timestamps when writing the records back to Kafka.
Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).
Update:
In general, you can modify timestamps via the Processor API. Calling context.forward()
you can set the output record timestamp via To.all().withTimestamp(...)
as a parameter for forward()
.