Search code examples
javajava-8spring-integrationspring-integration-dsl

Spring Integration Dynamic Flows threads issue


I am creating dynamic integration flows as based on the data from the database.

The directories which we need to poll with the file name pattern are in the databse

e.g.

Instance, directory , filename 
ABC     , c:/input1  , test.txt
DEF     , d:/input2 ,  fresh.xlsx

I have some 200-300 entries so what I am doing is creating Integration flow for each record as it will have different processor etc

for each record

 IntegrationFlowBuilder flowBuilder =
                        IntegrationFlows
                                .from(new CustomFileReadingSource(input), consumer);

     flowBuilder.transform(new ObjectToJsonTransformer());

      flowBuilder.handle(o -> {

    //                System.out.println(o.getPayload());
                });
context.registration(flowBuilder.get()).register();

Once all of them are registered but when I look at the VisualVM or the log I only see 8-10 threads instead of 100 or 200.

from the log

2018-11-13 16:00:41.399 [task-scheduler-3] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:41.587 [task-scheduler-10] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.807 [task-scheduler-4] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.071 [task-scheduler-5] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.323 [task-scheduler-7] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.569 [task-scheduler-6] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.878 [task-scheduler-8] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:43.197 [task-scheduler-9] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:43.588 [task-scheduler-1] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:43.951 [task-scheduler-2] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:44.305 [task-scheduler-3] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:44.598 [task-scheduler-10] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.031 [task-scheduler-4] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:45.414 [task-scheduler-5] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:45.974 [task-scheduler-7] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 

As you can see its only few threads polling the thread

Could someone help why it does not create threads or any better way to achieve parallel pollers ?


Solution

  • That's correct. Because all Polling Ednpoints are based on the predefined global ThreadPoolTaskScheduler with the 10 as pool size by default: https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler

    On the other hand it is really pointless to try to have 100 thread meanwhile your CPU is maximum 16 cores. Making more threads may even cause slowing down of your application.