Search code examples
apache-kafkaapache-kafka-streams

Set timestamp in output with Kafka Streams


I'm getting CSVs in a Kafka topic "raw-data", the goal is to transform them by sending each line in another topic "data" with the right timestamp (different for each line).

Currently, I have 2 streamers:

  • one to split the lines in "raw-data", sending them to an "internal" topic (no timestamp)
  • one with a TimestampExtractor that consumes "internal" and send them to "data".

I'd like to remove the use of this "internal" topic by setting directly the timestamp but I couldn't find a way (the timestamp extractor are only used at consumption time).

I've stumbled upon this line in the documentation:

Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().

but I couldn't find any signature with a timestamp. What do they mean?

How would you do it?

Edit: To be clear, I have a Kafka topic with one message containing the event time and some value, such as:

2018-01-01,hello 2018-01-02,world (this is ONE message, not two)

I'd like to get two messages in another topic with the Kafka record timestamp set to their event time (2018-01-01 and 2018-01-02) without the need of an intermediate topic.


Solution

  • Setting the timestamp for the output requires Kafka Streams 2.0 and is only supported in Processor API. If you use the DSL, you can use process() or transform() to use those APIs.

    As you pointed out, you would use context.forward(). The call would be:

    // new API
    stream.process(new ProcessorSupplier() {
      public Processor get() {
        return new Processor() {
          // omit other methods for brevity
          // you need to get the `context` from `init()`
    
          public void process(Record r) {
            // some business logic
    
            // you can call #forward() as often as you want
            context.forward(r.withTimestamp(newTimestamp));
          }
        }
      }
    });
    
    // old API
    stream.transform(new TransformerSupplier() {
      public Transformer get() {
        return new Transformer() {
          // omit other methods for brevity
          // you need to get the `context` from `init()`
    
          public KeyValue transform(K key, V value) {
            // some business logic
    
            // you can call #forward() as often as you want
            context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));
    
            return null; // only return data via context#forward()
          }
        }
      }
    });