I want to extract Timestamps embedded with each message and send them as json payload into my database.
I want to get the following three timestamps.
Event-time: The point in time when an event or data record occurred, i.e. was originally created “by the source”.
Processing-time: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed.
Ingestion-time: The point in time when an event or data record is stored in a topic partition by a Kafka broker.
This is my streams application code:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("========> o365_user_activity_by_date Log: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString()); // UTC TIME
send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
send.put("user_id", received.get("UserId"));
send.put("operation", received.get("Operation"));
send.put("workload", received.get("Workload"));
keywords.add(send.toString());
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("o365_user_activity_by_date");
In the code I am simply getting each record, doing some stream processing on it and sending it to a different topic.
Now with each record I want to send Event-time
, Processing-time
and Ingestion-time
embedded in the payload.
I have looked at the FailOnInvalidTimestamp
and WallclockTimestampExtractor
but I am confused on how to use them.
Kindly guide me how can I achieve this.
The Timestamp
extractor can only give you one timestamp and this timestamp is used for time-based operations like windowed-aggregations or joins. It seems that you don't do any time-based computation thought, thus, from a computation point of view, it does not matter.
Note, that a record has only one metadata timestamp field. This timestamp field can be used to store an event-timestamp that can be set by the producer. As an alternative, you can let the broker overwrite the producer provided timestamp with the broker ingestion time. This is a topic configuration.
To access the record metadata timestamp (independent if it's event-time or ingestion-time), the default timestamp extractor with give you this timestamp. If you want to access it in your application, you need to use Processor API, ie, in your case a .transform()
instead of a .flatMap
operator. Your Transformer
will be initialized with a context
object that allows you to access the extracted timestamp.
Because a record can only store one metadata timestamp and because you want to use this for broker ingestion time, the upstream producer must put the event-timestamp into the payload directly.
For processing-time, just do a system call as indicated in your code snippet already.