Search code examples
spring-bootspring-cloud-streamspring-context

Spring Cloud dynamic consumer creation, message type is lost


I'm creating the cloud streams and consumers for it dynamically can't get consumer defined properly as information about type is lost.

In the example below consumer defined with @Bean annotation works fine, it receive message with all headers and payload. Consumer defined dynamically receive byte[] as input because when it created information about generic type is lost as method registerSingleton(String beanName, Object singletonObject) receive object as parameter.

I've tried to manually deserialize the message, but spring send serialized payload, not message with payload, to dynamically created consumer so I can't access headers. This mean that I'm receiving serialized SitePayload in the dynamic consumer instead of serialized Message.

Any advise on how to create consumer dynamically so it can access message headers?

public class ConsumersCreator implements BeanFactoryPostProcessor {
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

            for (int i = 0; i < 5; i++) {
                beanFactory.registerSingleton("consumer" + i, createConsumer(i));
            }
        }

    private Consumer<Message<SitePayload>> createConsumer(int id) {
        return message -> log.info("Consumer id: {} message received: {}", id, message);
    }

    @Bean
    public Consumer<Message<SitePayload>> consumer() {
        return message -> log.error("Message received: {}", message);
    }
}


Solution

  • I am not sure why do you create consumers dynamically. I mean I can't envision the use case since, since in a typical stream app one would have to have some consumer/producer properties for the bindings etc. . .

    In any event, the dynamic mechanism you are questioning is more related to spring-core then s-c-function. That said we do provide a mechanism to register functions dynamically and that is via FunctionRegistration class.

    Here is a small sample (you can find more in our test cases, especially for cases with generics where ResolvableType is used)

    FunctionRegistry registry = applicationContext.getBean(FunctionRegistry.class)
    FunctionRegistration registration = new FunctionRegistration(new MyFunction(), "a")
                        .type(FunctionTypeUtils.functionType(Integer.class, String.class));
    registry.register(registration);
    

    Tham you register each FunctionRegistration as a bean and we'll discover it with proper type information.