I am using spring integration dsl to create a pipeline to enrich and persist the event from kafka to DB. There can be multiple event types and we have create an event interface and corresponding implementation classes.
Event interface has method definition - parse(), process(), persist() And the implementation class has the implementation
@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainer))
.wireTap(WIRE_TAP_CHNL) // Log the raw messaged
.handle(eventDispatch, "preProcess")
.handle(eventDispatch, "dispatch") //identify the implementation class from the event name
.handle(???,"parse") //get the classname from the above handler and then invoke methods
.handle(???,"validate")
.get();
.handle(eventDispatch, "dispatch") -> gives out the implementation class name at runtime
how can i call method on this class name, as I get the class at runtime based on some input events
You can store that object from dispatch
into a header. The next handle()
can get that header, cast to a common interface and call an appropriate method against payload
. This is indeed a strategy pattern. You really can’t do that with a static handler object definition.
UPDATE
can you please provide an example to refer
There is no such an example because a use-case goes out of standard components behavior. Your task is one of the million (or more?) possible business requirements in the field. We definitely can't cover all of them in our samples.
Anyway this is what I think about it:
.enrichHeaders(h -> h.headerFunction("handler", eventDispatch::dispatch))
.handle((p, h) -> ((EventProcessor) h.get("handler")).parse(p))
.handle((p, h) -> ((EventProcessor) h.get("handler")).validate(p))