I have 2 streams of data and I want to be able to join them for a window of 1 month let's say. When I have a live data everything is fun and super easy with KStream and join. I did something like this;
KStream<String, GenericRecord> stream1 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());
KStream<String, GenericRecord> stream2 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());
long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
KStream<String, GenericRecord> joinStream = stream1.join(stream2,
new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
....
....
....
return jonnedRecord;
}
}, JoinWindows.of(joinWindowSizeMs));
The problem appears when I want to do a data replay. let's say I want to re-do these join for the data I have for past 6 months since I am running the pipeline for all data at once kafkaStream will join all the joinable data and it doesn't take the time difference into consideration (which it should only join past one month of data). I am assuming the JoinWindow time is the time we insert data into Kafka topic, am I right?
And how can I change and manipulate this time so I can run my data replay correctly, I mean for re-inserting these past 6 months of data it should take a window of one month for each respective record and join based one that.
This question is not duplicate of How to manage Kafka KStream to Kstream windowed join?, there I asked about how can I can join based on the window of time. here I am talking about data replay. from my understanding during join Kafka take the time that data is inserted to the topic as the time for JoinWindow, so if you want to do the data replay and re-insert the data for 6 month ago kafka take it as a new data which is inserted today and gonna join it with some othrr data that is actually for today which it shouldn't.
Kafka's Streams API uses timestamps returned by TimestampExtractor
to compute joins. By default, this is the record's embedded metadata timestamp. (c.f. http://docs.confluent.io/current/streams/concepts.html#time)
Per default, KafkaProducer
sets this timestamp to current system time on write. (As an alternative, you can configure brokers on a per-topic basis to overwrite producer-provided timestamps of records with the broker's system time at the time the broker stored the record -- this provides "ingestion time" semantics.)
Thus, it is not a Kafka Streams issue per se.
There are multiple options to tackle the problem:
If your data is already in a topic, you can simply reset your Streams application to reprocess old data. For this, you can use the application reset tool (bin/kafka-streams-application-reset.sh
). You also need to specify auto.offset.reset
policy to earliest
in your Streams app. Check out the docs -- also, it's recommended to read the blog post.
This is the best approach, as you do not need to write data to the topic again.
KafkaProducer producer = new KafkaProducer(...);
producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value));
Thus, if you ingest old data you can set the timestamp explicitly and Kafka Streams will pick it up and compute the join accordingly.