I have a Streaming Processor that processes messages from a Kafka InputTopic
to an OutputTopic
. Furthermore I have multiple tenants for whom this processing shall take place. Lets call them tenant A and tenant B, but there can be more than a dozen tenants that the application should process. The input and output topics follow the naming convention: A-input, B-input, ... and A-output, B-output...
The function definition is like:
@Configuration
public class StreamProcessorConfig {
@Bean
public Function<KStream<String, InputType>, KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
My application.yaml now configures the streaming application for tenant A:
tenant: A
spring.cloud.function.definition: myfunctiondefinition
spring.cloud.stream.kafka.streams.binder.functions.myfunctiondefinition:
applicationId: ${spring.application.name}-myfunctiondefinition
spring.cloud.stream.bindings.myfunctiondefinition-in-0:
destination: ${tenant}-input
spring.cloud.stream.bindings.myfunctiondefinition-out-0:
destination: ${tenant}-output
How can I modify the configuration to add an instance for tenant B? Of course I could duplicate myfunctiondefinition() as well as all configuration keys, but I'm looking for a way to dynamically add tenants fast and clean solely through configuration. Is this possible?
Note: Running another instance of the application for tenant B and further tenants is sadly not an option.
We found a solution to this problem by manually registering the function beans. Sadly this was not quite as easy as we thought it would be. FunctionDetectorCondition
(https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/FunctionDetectorCondition.java) requires an AnnotatedBeanDefinition
that used as a template for the actual Stream Processing bean. This could be taken as a proposal to spring cloud streams for registering a function defintion template that can be used multiple times.
To reach this goal we initialise a factory bean instead of the stream processor function itself:
@Configuration
public class StreamProcessorConfig {
@Bean
public MyFunctionDefinitionFactory myFunctionDefinitionFactory() {
return new MyFunctionDefinitionFactory();
}
}
The factory creates the stream processor function:
public class MyFunctionDefinitionFactory {
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
Now we need a Dummy Bean Interface that is Required for Spring Cloud Streams to apply its logic to create the stream processor:
// Behaves as dummy bean for spring cloud stream
// Has to be the same name as the original streaming function in the factory.
// In this case we named the method "myfunctiondefinition",
// so the dummy-bean has to get the name "Myfunctiondefinition".
public class Myfunctiondefinition implements Function<KStream<String, InputType>,
KStream<String, OutputType>> {
// !!! It could be that changes are needed if spring cloud streams changes the logic
// Method myfunctiondefinition() is needed, because spring cloud streams searches for
// a method with the same name as the class in
// FunctionDetectorCondition:pruneFunctionBeansForKafkaStreams
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return null;
}
// Needed for the interface implementation. Spring cloud streams needs
// the class Function to identify a stream processor candidate.
@Override
public KStream<String, OutputType> apply(KStream<String, InputType> input) {
return null;
}
}
Now that we have all things in place we can register a bean per tenant. We do this within an ApplicationContextInitializer
that creates a bean definition with a factory method and iterate over the functions
that we will define in the configuration file application.yaml
.
public class StreamProcessorInitializer
implements ApplicationContextInitializer<GenericWebApplicationContext> {
@Override
public void initialize(GenericWebApplicationContext context) {
String functionDefinitions = context.getEnvironment()
.getProperty("spring.cloud.function.definition");
String splitter = context.getEnvironment()
.getProperty("spring.cloud.function.definition.splitter");
String factoryName = CaseFormat.UPPER_CAMEL.
.to(CaseFormat.LOWER_CAMEL, MyFunctionDefinitionFactory.class.getSimpleName());
String factoryMethodName =
MyFunctionDefinitionFactory.class.getMethods()[0].getName();
AnnotatedGenericBeanDefinition def =
new AnnotatedGenericBeanDefinition(Myfunctiondefinition.class);
def.setFactoryBeanName(factoryName);
def.setFactoryMethodName(factoryMethodName);
Arrays.stream(functionDefinitions.split(splitter))
.forEach(function -> context.registerBeanDefinition(function, def));
}
}
Finally we can dynamically define functions within the application.yaml
. This can be done by helm oder kustomize to configure the specific tenant environment:
#--------------------------------------------------------------------------------------------------------------------------------------
# streaming processor functions (going to be filled by helm)
#--------------------------------------------------------------------------------------------------------------------------------------
spring.cloud.function.definition: <name1>,<name2>,...
#--Note-- required as spring cloud streams has changed the splitter in the past
spring.cloud.function.definition.splitter: ;
# Properties per function (<name>)
spring.cloud.stream.kafka.streams.binder.functions.<name>.applicationId: ${tenant}-${spring.application.name}-<name>
# configuring dlq (if you have one)
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.dlqName: ${tenant}-<name>-dlq
# configuring in- and output topics
spring.cloud.stream.bindings.<name>-in-0.destination: ${tenant}-<inputname>
spring.cloud.stream.bindings.<name>-out-0.destination: ${tenant}-<outputname>