Search code examples
javaspring-cloudapache-kafka-streamsspring-kafkaspring-cloud-stream

Can I use a spring cloud stream function definition for multiple tenants (kafka binder)?


I have a Streaming Processor that processes messages from a Kafka InputTopic to an OutputTopic. Furthermore I have multiple tenants for whom this processing shall take place. Lets call them tenant A and tenant B, but there can be more than a dozen tenants that the application should process. The input and output topics follow the naming convention: A-input, B-input, ... and A-output, B-output...

The function definition is like:

@Configuration
public class StreamProcessorConfig {

    @Bean
    public Function<KStream<String, InputType>, KStream<String, OutputType>> myfunctiondefinition() {
        return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
            return KeyValue.pair(k, OutputType.createFrom(v));
        });
    }

}

My application.yaml now configures the streaming application for tenant A:

tenant: A

spring.cloud.function.definition: myfunctiondefinition
spring.cloud.stream.kafka.streams.binder.functions.myfunctiondefinition:
    applicationId: ${spring.application.name}-myfunctiondefinition

spring.cloud.stream.bindings.myfunctiondefinition-in-0:
  destination: ${tenant}-input
spring.cloud.stream.bindings.myfunctiondefinition-out-0:
  destination: ${tenant}-output

How can I modify the configuration to add an instance for tenant B? Of course I could duplicate myfunctiondefinition() as well as all configuration keys, but I'm looking for a way to dynamically add tenants fast and clean solely through configuration. Is this possible?

Note: Running another instance of the application for tenant B and further tenants is sadly not an option.


Solution

  • We found a solution to this problem by manually registering the function beans. Sadly this was not quite as easy as we thought it would be. FunctionDetectorCondition (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/FunctionDetectorCondition.java) requires an AnnotatedBeanDefinition that used as a template for the actual Stream Processing bean. This could be taken as a proposal to spring cloud streams for registering a function defintion template that can be used multiple times.

    To reach this goal we initialise a factory bean instead of the stream processor function itself:

    @Configuration
    public class StreamProcessorConfig {
    
        @Bean
        public MyFunctionDefinitionFactory myFunctionDefinitionFactory() {
            return new MyFunctionDefinitionFactory();
        }
    }
    

    The factory creates the stream processor function:

    public class MyFunctionDefinitionFactory {
    
        public Function<KStream<String, InputType>, 
                   KStream<String, OutputType>> myfunctiondefinition() {
            return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
                return KeyValue.pair(k, OutputType.createFrom(v));
            });
        }
    }
    

    Now we need a Dummy Bean Interface that is Required for Spring Cloud Streams to apply its logic to create the stream processor:

    // Behaves as dummy bean for spring cloud stream
    // Has to be the same name as the original streaming function in the factory.
    // In this case we named the method "myfunctiondefinition",
    // so the dummy-bean has to get the name "Myfunctiondefinition".
    public class Myfunctiondefinition implements Function<KStream<String, InputType>, 
                    KStream<String, OutputType>> {
    
        // !!! It could be that changes are needed if spring cloud streams changes the logic
        // Method myfunctiondefinition() is needed, because spring cloud streams searches for 
        // a method with the same name as the class in 
        // FunctionDetectorCondition:pruneFunctionBeansForKafkaStreams
        public Function<KStream<String, InputType>, 
                   KStream<String, OutputType>> myfunctiondefinition() {
            return null;
        }
    
        // Needed for the interface implementation. Spring cloud streams needs
        // the class Function to identify a stream processor candidate.
        @Override
        public KStream<String, OutputType> apply(KStream<String, InputType> input) {
            return null;
        }
    }
    

    Now that we have all things in place we can register a bean per tenant. We do this within an ApplicationContextInitializer that creates a bean definition with a factory method and iterate over the functions that we will define in the configuration file application.yaml.

    public class StreamProcessorInitializer 
        implements ApplicationContextInitializer<GenericWebApplicationContext> {
    
        @Override
        public void initialize(GenericWebApplicationContext context) {
            String functionDefinitions = context.getEnvironment()
                .getProperty("spring.cloud.function.definition");
            String splitter = context.getEnvironment()
                .getProperty("spring.cloud.function.definition.splitter");
            String factoryName = CaseFormat.UPPER_CAMEL.
                .to(CaseFormat.LOWER_CAMEL, MyFunctionDefinitionFactory.class.getSimpleName());
            String factoryMethodName =
                 MyFunctionDefinitionFactory.class.getMethods()[0].getName();
    
            AnnotatedGenericBeanDefinition def = 
                new AnnotatedGenericBeanDefinition(Myfunctiondefinition.class);
            def.setFactoryBeanName(factoryName);
            def.setFactoryMethodName(factoryMethodName);
    
            Arrays.stream(functionDefinitions.split(splitter))
               .forEach(function -> context.registerBeanDefinition(function, def));
        }
    }
    

    Finally we can dynamically define functions within the application.yaml. This can be done by helm oder kustomize to configure the specific tenant environment:

    #--------------------------------------------------------------------------------------------------------------------------------------
    # streaming processor functions (going to be filled by helm)
    #--------------------------------------------------------------------------------------------------------------------------------------
    spring.cloud.function.definition: <name1>,<name2>,...
    #--Note-- required as spring cloud streams has changed the splitter in the past
    spring.cloud.function.definition.splitter: ;
    
    # Properties per function (<name>)
    spring.cloud.stream.kafka.streams.binder.functions.<name>.applicationId: ${tenant}-${spring.application.name}-<name>
    # configuring dlq (if you have one)
    spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.deserializationExceptionHandler: sendToDlq
    spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.dlqName: ${tenant}-<name>-dlq
    # configuring in- and output topics
    spring.cloud.stream.bindings.<name>-in-0.destination: ${tenant}-<inputname>
    spring.cloud.stream.bindings.<name>-out-0.destination: ${tenant}-<outputname>