Search code examples
kotlinapache-kafkaapache-kafka-streams

Kstream transformValues ambigious


I have problem with ambiguous methods in KStream.

Actually I don't understand what is wrong.

.toStream()
        .transformValues(
            ValueTransformerWithKeySupplier<String, CandleDto, CandleDto> {
                SuppressTransformer<String, CandleDto, CandleDto>(
                    Duration.ofMinutes(5),
                    "supress-store",
                    Duration.ofSeconds(5))
            },
            "suppress-store"
        )

This code result into this: enter image description here

class SuppressTransformer<K, V, VR>(
    private val windowSize: Duration,
    private val storeName: String,
    private val scheduleTime: Duration
) : ValueTransformerWithKey<K, V, VR> {
    // Implementation
}

Solution

  • Update: I just realize what is wrong (because i'm blind). Thanks @OneCricketeer, your answer forced me to look logs once more :-)

    Error was caused because I use windowing before transformation. And I had to map values before start transformation.

    Now it is compilable:

            .map { key, value -> KeyValue.pair(key.key(), Mapper.toCandle(value)) }
            .transformValues(
                ValueTransformerWithKeySupplier<String, Candle, Candle> {
                    SuppressTransformer<String, Candle,Candle>(
                        Duration.ofMinutes(5),
                        "suppress-store",
                        Duration.ofSeconds(5))
                },
                "suppress-store"
            )