Search code examples
sslapache-kafkaapache-kafka-streamsspring-cloud-stream

Spring Cloud Stream Kafka-Streams unable to configure SSL for my consumer and producer


I'm struggling to configure Spring Cloud Stream correctly for Kafka-Streams to use SSL with a trust-store and key-store.

In my application i have multiple Streams running, the SSL configuration should be the same for all of them.

The application looks like this:

Stream1: Topic1 > Topic2

Stream2: Topic2 > Topic4 Topic3

Stream3: Topic4 > Topic5

I use the latest Spring-Cloud Stream Framework with Kafka-Streams, with Avro Models. I can configure the schema-registry.

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

My application.yaml file looks like this:

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
        stream2-in-0:
          destination: topic2
        stream2-in-1:
          destination: topic3
        stream2-out-0:
          destination: topic4
        stream3-in-0:
          destination: topic4
        stream3-out-0:
          destination: topic5
      kafka:
        binder:
          brokers: kafkabrokerurl.com:9092
          configuration: # not recognized at all
            security.protocol: SSL
            ssl.truststore.location: /mnt/truststore.jks
            ssl.truststore.type: JKS
            ssl.keystore.location: /mnt/keystore.jks
            ssl.keystore.type: JKS
            ssl.enabled.protocols: TLSv1.2
        bindings:
          default:
            consumer:
              resetOffsets: false
              startOffset: latest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream2-in-0:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream2-in-1:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
              materializedAs: sbinfo-outage-mapping-store
          stream2-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream3-in-0:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream3-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: https://schemaregistryurl.com # this works

When i start the application with debug-log enabled it shows that it doesn't load the configuration that i set except for the schema-registry.

 o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [kafkabrokerurl.com:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

So the broker will be loaded correctly but for example truststore.location just remains null.

I tried many different approaches i found here and in some other places.

I found an old issue here and tried this approach but outcome is the same: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129

          configuration: # not recognized at all
            "[security.protocol]": SSL
            "[ssl.truststore.location]": /mnt/truststore.jks
            "[ssl.truststore.type]": JKS
            "[ssl.keystore.location]": /mnt/keystore.jks
            "[ssl.keystore.type]": JKS
            "[ssl.enabled.protocols]": TLSv1.2

I read about that configuration is not working when using multiple binders so i also tried the approach with defining a binder name, but it complains that it doesn't recognize it.

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
          binder: ssl
        stream1-out-0:
          destination: topic2
          binder: ssl
        stream2-in-0:
          destination: topic2
          binder: ssl
        stream2-in-1:
          destination: topic3
          binder: ssl
        stream2-out-0:
          destination: topic4
          binder: ssl
        stream3-in-0:
          destination: topic4
          binder: ssl
        stream3-out-0:
          destination: topic5
          binder: ssl
      binders:
        ssl:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers:
                      configuration:
                        security.protocol: SSL
                        ssl.truststore.location: /mnt/secrets/truststore.jks
                        ssl.truststore.type: JKS
                        ssl.keystore.location: /mnt/secrets/keystore.jks
                        ssl.keystore.type: JKS
                        ssl.enabled.protocols: TLSv1.2

Error:

2021-07-15 17:11:14.634 ERROR 5216 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream

I have a @Configuration annotated class where i have my 3 streams declared as Function, BiFunction and again Function.

I hope someone can help me - Thank you.


Solution

  • You are missing the streams element in the property name - you are configuring the Kafka MessageChannel Binder instead.

    spring:
      cloud:
        stream:
          kafka:
            streams:
              binder:
                configuration:
                  security:
                    protocol: SSL