I'm looking through some basic samples of using Kafka streams. I noticed that keys are not being set. I'm attempting to set a unique key per record. A line of text is split into records - one record is one word in the line. The script I have currently attaches the very same key to every record. I'd like to be able to have every record with a unique key, but I don't see a good place for it. Would much appreciate some help. Here is what I have now:
KStream<String, String> source = builder.stream(INPUT_TOPIC);
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String line) {
return Arrays.asList(line.split("\\W+"));
}
});
Long key = (long)(Math.random()*9000)+1000;
KStream<String, String> wordsKeyed = words.map((k, v) -> new KeyValue<>("xyx"+key, v));
wordsKeyed.to(OUTPUT_TOPIC);
It attaches the same key because you only compute one random value.
If you want each key to be random, use new KeyValue<>("xyx" + (Math.random()*9000+1000), ...)
If you want each key to be unique (since random numbers can overlap), use an AtomicInteger
with incrementAndGet()