Search code examples
spring-integrationspring-integration-dsl

Spring integration service activator use bean name based on input


I am using spring integration dsl to create a pipeline to enrich and persist the event from kafka to DB. There can be multiple event types and we have create an event interface and corresponding implementation classes.

Event interface has method definition - parse(), process(), persist() And the implementation class has the implementation

@Bean
    public IntegrationFlow  baseEventFlow() {
                return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainer))
                        .wireTap(WIRE_TAP_CHNL) // Log the raw messaged
                        .handle(eventDispatch, "preProcess") 
                        .handle(eventDispatch, "dispatch") //identify the implementation class from the event name
                        .handle(???,"parse")  //get the classname from the above handler and then invoke methods
                        .handle(???,"validate")
                        .get();

.handle(eventDispatch, "dispatch") -> gives out the implementation class name at runtime

how can i call method on this class name, as I get the class at runtime based on some input events


Solution

  • You can store that object from dispatch into a header. The next handle() can get that header, cast to a common interface and call an appropriate method against payload. This is indeed a strategy pattern. You really can’t do that with a static handler object definition.

    UPDATE

    can you please provide an example to refer

    There is no such an example because a use-case goes out of standard components behavior. Your task is one of the million (or more?) possible business requirements in the field. We definitely can't cover all of them in our samples.

    Anyway this is what I think about it:

    .enrichHeaders(h -> h.headerFunction("handler", eventDispatch::dispatch))
    .handle((p, h) -> ((EventProcessor) h.get("handler")).parse(p))
    .handle((p, h) -> ((EventProcessor) h.get("handler")).validate(p))