Search code examples
spring-bootapache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream Kafka Streams inbound KTable predictable internal state-store topic names


We're using Kafka Streams with Spring Cloud Stream Functions. We have the typical example application which joins user clicks kstream with user regions ktable.

We know we can force custom names for internal changelog or repartition topics by using appropiate methods that accept a name for materialized store when defining our topology:

  @Bean
  public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> bifunctionktable() {
    return (userClicksStream, userRegionsTable) -> userClicksStream
        .leftJoin(userRegionsTable,
            (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
            Joined.with(Serdes.String(), Serdes.Long(), null, "bifunctionktable-leftjoin"))
        .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()).withName("bifunctionktable-groupbykey"))
        .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks, Materialized.as("bifunctionktable-reduce"))
        .toStream();
  }

But for the input KTable we cannot change its state-store internal topic name and we always get this topic name: myapp-id-user-regions-STATE-STORE-0000000001-changelog

If we were fully creating our topology by code we do have builder.table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) method, but using functions... Is there any way to customize the internal topic name for the input KTable in this case?


Solution

  • You can add a custom name for the incoming KTable by using the following property:

    spring.cloud.stream.kafka.streams.bindings.bifunctionktable-in-1.consumer.materializedAs: <Your-custom-store-name>
    

    This is documented in this section of the reference docs.