Search code examples
spring-integrationweblogicspring-transactionsjtaxa

JtaTransaction with Jdbc XA and Jms


So, I have a spring boot project where i need to do this:

  • Start Transaction -A JDBC poller that reads rows with status TO_SEND , -Send a Jms for every row, -Update status 'SENT' -Commit Transaction or rollback on failure

The server is Weblogic, with XA datasource for rows processed, XA factory for Jms, jndi context and spring integration poller (jdbcpollingchaneladapter) and jta transaction:

As found here in this doc, in order to do so, i have to use a JtaTransaction with userTransaction, and create a non transacted Jms session

   // DATABASE Poller using JdbcPollingChannelAdapter
    @Bean
    @InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
    public MessageSource<?> jpaInbound() {
        // Select request by status = 'TO_SEND'
        JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
                StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
        StgOutJms stg = new StgOutJms();
        j.setRowMapper(stg);
        return j;
    }

     //Poller metadata with jta Transaction
     @Bean
        public PollerMetadata pollerMetadata() throws  NamingException   {
            return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
                    .transactional(transactionManager).get();
        }

Jta Transaction manager using userTransaction :

    @Bean
    public PlatformTransactionManager transactionManager() throws NamingException {
        Hashtable<String, String> properties = new Hashtable<>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
        InitialContext vInitialContext = new InitialContext(properties);    
        UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");       
        return new JtaTransactionManager(xact);
    }

Process :

// Service Activator : Lunching the Jms Creation for each row
    @Bean
    @ServiceActivator(inputChannel = "jpaInputChannel")
    public MessageHandler handler() {

        return wlstoreMessage -> {
            try {
                                 
                jmsSenderService.
                consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
            } catch (NamingException | JMSException e) {
                log.error(e.getMessage(), e);
            }

        };
    }

    @Override
    public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
            logger.info("JMS: Consume messages");
    
            for (StgOutJms stgOutEntity : stgEntityList) {
                if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
                  
                    sendMessage(stgOutEntity);
                    stgOutEntity.setStatus("SENT");
                    repositoryOut.save(stgOutEntity);
                } else {
                    logger.error("The id  of the object received is null");
                }
            }
        }

Jms connection :

    @Override
    public void initQueueConnection() throws NamingException, JMSException {

        Hashtable<String, String> properties = new Hashtable<String, String>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
        InitialContext vInitialContext = new InitialContext(properties);

        QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
                .lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));

        vQueueConnection = vQueueConnectionFactory.createQueueConnection();
        vQueueConnection.start();

        vQueueSession = vQueueConnection.createQueueSession(false, 0);

        Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));

        vQueueSender = vQueueSession.createSender(vQueue);
    }

The problem with this code is that the Jms messages are sent in a transaction ( commit on success , rollback on failure ) but the status sent is never updated ( crudrepository).

Also, i tried using jpaTransactionManager, it works good for Database save, but the Jms messages are sent before transaction commit ( no jms rollback on failure).

I'll appreciate the help!


Solution

  • After days of research, the only solution i found is to pass a transaction manager to the the jdbc poller and use JmsTemplate with session transacted = true, so that the commit/rollback of jms and jdbc are done in the same transaction.

      // JMS Beans
        @Bean
        public JndiTemplate jndiTemplate() {
    
            final Properties jndiProps = new Properties();
            jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
            jndiProps.setProperty(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
    
            JndiTemplate jndiTemplate = new JndiTemplate();
            jndiTemplate.setEnvironment(jndiProps);
    
            return jndiTemplate;
        }
    
        @Bean
        public JndiObjectFactoryBean queueConnectionFactory() {
            JndiObjectFactoryBean queueConnectionFactory = new JndiObjectFactoryBean();
            queueConnectionFactory.setJndiTemplate(jndiTemplate());
            queueConnectionFactory.setJndiName(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
    
            return queueConnectionFactory;
        }
    
        @Bean
        public JmsTemplate jmsTemplate() {
            JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) queueConnectionFactory().getObject());
            jmsTemplate.setReceiveTimeout(500);
            jmsTemplate.setSessionTransacted(true);
            return jmsTemplate;
        }
    
     @Bean
        public JndiObjectFactoryBean jmsQueueOut() {
            JndiObjectFactoryBean jmsQueue = new JndiObjectFactoryBean();
            jmsQueue.setJndiTemplate(jndiTemplate());
            jmsQueue.setJndiName(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
    
            return jmsQueue;
        }
    

    Using a poller by passing transaction manager

    @Bean
        public PollerMetadata pollerMetadata() {
            return Pollers
                    .fixedDelay(Long.valueOf(env.getProperty("poller.interval"))).transactional(transactionManager)
                    .get();
        }
    
        // DATABASE Poller using JdbcPollingChannelAdapter
        @Bean
        @InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
        public MessageSource<?> jpaInbound() {
    
            // Select request by status = 'TO_SEND'
            JdbcPollingChannelAdapter poller = new JdbcPollingChannelAdapter(datasource,
                    StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
            // RowMapper for mapping the list returned to the entity StgOutJms
            poller.setRowMapper(new StgOutJms());
            poller.setMaxRowsPerPoll(10);
            return poller;
        }
    

    And a service activator

     // Service Activator : Lunching the Jms Creation for each row
        @Bean
        @ServiceActivator(inputChannel = "jpaInputChannel")
        public MessageHandler handler() {
    
            return wlstoreMessage -> {
                try {
    
                    jmsSenderService.consumeMessage((Destination) jmsQueueOut().getObject(),
                            (List<StgOutJms>) wlstoreMessage.getPayload());
                } catch (NamingException | JMSException e) {
                    log.error(e.getMessage(), e);
                }
    
            };
        }
    

    Hope it's helpful.