Search code examples
spring-integration

How does JdbcPollingChannelAdapter maxRows differs from Poller maxMessagesPerPoll?


I have multiple Polling flows using the same flow logic, but varying the channel (it's a database column id_channel to separate rows by dependence).

On the JdbcPollingChannelAdapter, setMaxRows is fixed to 1. To my understandig each roundtrip to the database will fetch one row.

If I have 5 Polling flows and 10 threads, how each polling flow will "compete" with each other? Does setting Pollers.maxMessagesPerPoll makes any difference in the concurrency, given that JdbcPollingChannelAdapter.setMaxRows is always 1?

My application.properties (custom datasource) has:

spring.task.scheduling.pool.size=10
spring.pgsql.hikari.maximum-pool-size=10

Flow logic:

private MessageSource<Object> buildJdbcMessageSource(final int channel) {
    JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(dataSource, FETCH_QUERY);
    adapter.setMaxRows(1);
    adapter.setUpdatePerRow(true);
    adapter.setSelectSqlParameterSource(new MapSqlParameterSource(Map.of("idCanal", channel)));
    adapter.setRowMapper((RowMapper<IntControle>) (rs, i)
            -> new IntControle(rs.getLong(1), rs.getInt(2), rs.getString(3)));
    adapter.setUpdateSql(UPDATE_QUERY);

    return adapter;
}

private IntegrationFlow buildIntegrationFlow(final int channel, final long rate, final int maxMessages) {
    return IntegrationFlows.from(buildJdbcMessageSource(channel),
                    c -> c.poller(Pollers.fixedDelay(rate)
                                    .transactional(transactionInterceptor())
                            .maxMessagesPerPoll(maxMessages)))
            .split()
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, ERROR_CHANNEL))
            .channel(SybaseFlowConfiguration.SYBASE_SINK)
            .get();
}

public IntegrationFlow pollingFlowChannel1() {
    return buildIntegrationFlow(1, properties.getChan1RateMs(), properties.getChan1MaxMessages());
}

public IntegrationFlow pollingFlowChannel2() {
    return buildIntegrationFlow(2, properties.getChan2RateMs(), properties.getChan2MaxMessages());
}

...

Solution

  • We have some explanation in the doc: https://docs.spring.io/spring-integration/docs/current/reference/html/jdbc.html#jdbc-max-rows-versus-max-messages-per-poll.

    Tell us please, if that is not enough for your expectations.