Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Streams - assigning keys


I'm looking through some basic samples of using Kafka streams. I noticed that keys are not being set. I'm attempting to set a unique key per record. A line of text is split into records - one record is one word in the line. The script I have currently attaches the very same key to every record. I'd like to be able to have every record with a unique key, but I don't see a good place for it. Would much appreciate some help. Here is what I have now:

    KStream<String, String> source = builder.stream(INPUT_TOPIC);
    KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String line) {
            return Arrays.asList(line.split("\\W+"));
        }
    });
    Long key = (long)(Math.random()*9000)+1000;
    KStream<String, String> wordsKeyed = words.map((k, v) -> new KeyValue<>("xyx"+key, v));
    wordsKeyed.to(OUTPUT_TOPIC);

Solution

  • It attaches the same key because you only compute one random value.

    If you want each key to be random, use new KeyValue<>("xyx" + (Math.random()*9000+1000), ...)

    If you want each key to be unique (since random numbers can overlap), use an AtomicInteger with incrementAndGet()