Search code examples
javaapache-kafkaapache-kafka-streams

Kafka: assigning a name to an internal topic that is created after map and repartition does not seem to be possible


With this simple topology, stripped down for reproducing the issue:

public KStream<PropertyValueKey, PropertyValue> configureTopology(StreamsBuilder builder, Properties serdesProps) {
    KStream<PropertyValueKey, PropertyValue> propertyValues =
        builder.stream(kafkaProperties.getPropertyValuesTopicName());

    KStream<PropertyTypeKey, PropertyValueWithKey> propertyValuesByType =
        propertyValues.map((valueKey, value) -> KeyValue.pair(
                new PropertyTypeKey(valueKey.getProjectId(), valueKey.getPropertyTypeId()),
                new PropertyValueWithKey(valueKey, value)),
             Named.as("map1"))
            .repartition();

    return propertyValues;
}

I am seeing an internally created topic appname-KSTREAM-REPARTITION-0000000002-repartition

I can't seem to find a way to override this internal name. I have successfully overridden topic names in the past, when it came to stores, using Materialized.as as described in https://docs.confluent.io/platform/current/streams/developer-guide/dsl-topology-naming.html, but this doesn't work the same way as with this map function. Is there any way to do this? Named.as does not have the intended effect, or any effect that I can spot.

Thiss is with using the Kafka docker image confluentinc/cp-kafka:6.2.0 and the Java client 2.8.1, but upgrading to 3.1.0 did not matter.


Solution

  • It was as easy as .repartition(Repartitioned.as("property-values-by-subject-repartitioned"));, unfortunately that wasn't mentioned in the docs but it was in the javadoc after some more searching.