Search code examples
apache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Correct configuration of WindowBytesStoreSupplier


What are correct WindowBytesStoreSupplier configuration values for a JoinWindows.of(Duration.ofMinutes(5))?

This

fun mangelJoinStoreSupplier(): WindowBytesStoreSupplier = Stores.inMemoryWindowStore(MangelJoinStore.NAME, Duration.ofMillis(86700000), Duration.ofMinutes(5), true)

results in:

org.apache.kafka.streams.errors.StreamsException: Window settings mismatch. WindowBytesStoreSupplier settings InMemoryWindowBytesStoreSupplier{name='mangel-join-store', retentionPeriod=86700000, windowSize=300000, retainDuplicates=true} must match JoinWindows settings JoinWindows{beforeMs=300000, afterMs=300000, graceMs=85800000} for the window size and retention period

Solution

  • For

    JoinWindows.of(Duration.ofMinutes(5)).grace(Duration.ZERO)
    

    is

    Stores.inMemoryWindowStore(MangelJoinStore.NAME, Duration.ofMinutes(10), Duration.ofMinutes(10), true)
    

    the correct supplier