I'm working at a Kafka Streams topology and sometimes, after changing the applicationId and/or clientId properties I'm receiving an error on a specific kafka stream: "Missing source topic stream.webshop.products.prices.5 durign assignment. Returning error INCOMPLETE_SOURCE_TOPIC_METADATA
". I have set the create.topic=true
property in the server.properties of every Kafka node, but it seems that the topic for this stream is not created.
Here is my Kafka Streams topology:
package ro.orange.eshop.productindexer.domain
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Printed
import ro.orange.digital.avro.Aggregate
import ro.orange.digital.avro.Key
import ro.orange.digital.avro.Price
import ro.orange.digital.avro.StockQuantity
import ro.orange.eshop.productindexer.infrastructure.configuration.kafka.makeStoreProvider
import java.util.concurrent.CompletableFuture
class SaleProductTopology(
private val streamNameRepository: IStreamNameRepository,
private val saleProductMapper: ISaleProductMapper,
private val productRatingMapper: IProductRatingMapper,
private val productStockMapper: IProductStockMapper,
private val lazyKafkaStreams: CompletableFuture<KafkaStreams>
) {
fun streamsBuilder(): StreamsBuilder {
val streamsBuilder = StreamsBuilder()
val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
val productPricesStream = streamsBuilder.stream<Key, Price>(streamNameRepository.productsPricesStreamTopic)
val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)
val productsStockStream = inputProductsStockStream
.mapValues(productStockMapper::aStockQuantity)
productsStockStream.to(streamNameRepository.productsStockStreamTopic)
streamsBuilder.globalTable<Key, StockQuantity>(
streamNameRepository.productsStockStreamTopic,
Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic)
)
val quantityProvider = lazyKafkaStreams.makeStoreProvider<StockQuantity>(streamNameRepository.productsStockGlobalStoreTopic)
val saleProductsTable = productsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))
.mapValues { aggregate -> saleProductMapper.aSaleProduct(aggregate, quantityProvider) }
saleProductsTable.toStream().print(Printed.toSysOut())
val productPricesTable = productPricesStream
.groupByKey()
.reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))
productPricesTable.toStream().print(Printed.toSysOut())
val productsRatingsTable = productsRatingsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))
.mapValues { aggregate -> productRatingMapper.aProductRating(aggregate) }
productsRatingsTable.toStream().print(Printed.toSysOut())
val productsStockTable = productsStockStream
.groupByKey()
.reduce { _, aggregate -> aggregate }
saleProductsTable
.leftJoin(productPricesTable) { saleProduct, price -> saleProductMapper.aPricedSaleProduct(saleProduct, price) }
.leftJoin(productsRatingsTable) { saleProduct, rating -> saleProductMapper.aRatedSaleProduct(saleProduct, rating) }
.leftJoin(productsStockTable) { saleProduct, stockQuantity -> saleProductMapper.aQuantifiedSaleProduct(saleProduct, stockQuantity) }
.mapValues { saleProduct -> AggregateMapper.aSaleProductAggregate(saleProduct) }
.toStream()
.to(streamNameRepository.saleProductsTopic)
return streamsBuilder
}
}
As @jacek-laskowski wrote:
KafkaStreams won't create it since it's a source
It is by design, because if one of the source topic where created automatically (it would have default number of partitions) and second in advance by user the number of partitions might be different. When KStream/KTable are joined they must have same number of partition - this is crucial assumption.
User has to create the topics consciously with proper number of partitions (for the number of stream processing threads that is one of the ways to control performance of Kafka Streams applications).
Read up Managing Streams Application Topics.