I am creating dynamic integration flows as based on the data from the database.
The directories which we need to poll with the file name pattern are in the databse
e.g.
Instance, directory , filename
ABC , c:/input1 , test.txt
DEF , d:/input2 , fresh.xlsx
I have some 200-300 entries so what I am doing is creating Integration flow for each record as it will have different processor etc
for each record
IntegrationFlowBuilder flowBuilder =
IntegrationFlows
.from(new CustomFileReadingSource(input), consumer);
flowBuilder.transform(new ObjectToJsonTransformer());
flowBuilder.handle(o -> {
// System.out.println(o.getPayload());
});
context.registration(flowBuilder.get()).register();
Once all of them are registered but when I look at the VisualVM or the log I only see 8-10 threads instead of 100 or 200.
from the log
2018-11-13 16:00:41.399 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.587 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.807 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.071 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.323 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.569 [task-scheduler-6] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.878 [task-scheduler-8] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.197 [task-scheduler-9] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.588 [task-scheduler-1] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.951 [task-scheduler-2] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.305 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.598 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.031 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.414 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.974 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
As you can see its only few threads polling the thread
Could someone help why it does not create threads or any better way to achieve parallel pollers ?
That's correct. Because all Polling Ednpoints
are based on the predefined global ThreadPoolTaskScheduler
with the 10
as pool size by default: https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler
On the other hand it is really pointless to try to have 100 thread meanwhile your CPU is maximum 16 cores. Making more threads may even cause slowing down of your application.