So, I have a spring boot project where i need to do this:
The server is Weblogic, with XA datasource for rows processed, XA factory for Jms, jndi context and spring integration poller (jdbcpollingchaneladapter) and jta transaction:
As found here in this doc, in order to do so, i have to use a JtaTransaction with userTransaction, and create a non transacted Jms session
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
StgOutJms stg = new StgOutJms();
j.setRowMapper(stg);
return j;
}
//Poller metadata with jta Transaction
@Bean
public PollerMetadata pollerMetadata() throws NamingException {
return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
.transactional(transactionManager).get();
}
Jta Transaction manager using userTransaction :
@Bean
public PlatformTransactionManager transactionManager() throws NamingException {
Hashtable<String, String> properties = new Hashtable<>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");
return new JtaTransactionManager(xact);
}
Process :
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.
consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
@Override
public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
logger.info("JMS: Consume messages");
for (StgOutJms stgOutEntity : stgEntityList) {
if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
sendMessage(stgOutEntity);
stgOutEntity.setStatus("SENT");
repositoryOut.save(stgOutEntity);
} else {
logger.error("The id of the object received is null");
}
}
}
Jms connection :
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
vQueueSender = vQueueSession.createSender(vQueue);
}
The problem with this code is that the Jms messages are sent in a transaction ( commit on success , rollback on failure ) but the status sent is never updated ( crudrepository).
Also, i tried using jpaTransactionManager, it works good for Database save, but the Jms messages are sent before transaction commit ( no jms rollback on failure).
I'll appreciate the help!
After days of research, the only solution i found is to pass a transaction manager to the the jdbc poller and use JmsTemplate with session transacted = true, so that the commit/rollback of jms and jdbc are done in the same transaction.
// JMS Beans
@Bean
public JndiTemplate jndiTemplate() {
final Properties jndiProps = new Properties();
jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
jndiProps.setProperty(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
JndiTemplate jndiTemplate = new JndiTemplate();
jndiTemplate.setEnvironment(jndiProps);
return jndiTemplate;
}
@Bean
public JndiObjectFactoryBean queueConnectionFactory() {
JndiObjectFactoryBean queueConnectionFactory = new JndiObjectFactoryBean();
queueConnectionFactory.setJndiTemplate(jndiTemplate());
queueConnectionFactory.setJndiName(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
return queueConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) queueConnectionFactory().getObject());
jmsTemplate.setReceiveTimeout(500);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public JndiObjectFactoryBean jmsQueueOut() {
JndiObjectFactoryBean jmsQueue = new JndiObjectFactoryBean();
jmsQueue.setJndiTemplate(jndiTemplate());
jmsQueue.setJndiName(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
return jmsQueue;
}
Using a poller by passing transaction manager
@Bean
public PollerMetadata pollerMetadata() {
return Pollers
.fixedDelay(Long.valueOf(env.getProperty("poller.interval"))).transactional(transactionManager)
.get();
}
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter poller = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
// RowMapper for mapping the list returned to the entity StgOutJms
poller.setRowMapper(new StgOutJms());
poller.setMaxRowsPerPoll(10);
return poller;
}
And a service activator
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.consumeMessage((Destination) jmsQueueOut().getObject(),
(List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
Hope it's helpful.