Search code examples
spring-bootmariadbactivemq-classicbitronix

Bitronix - JMS and JDBC - Message is dequeued on Exception


I'm trying to integrate Bitronix Transaction Manager in my Spring boot project to manage jdbc and jms transaction together. I have two databases and one ActiveMQ broker for jms. I've got connect the databases in the same transaction but when I tried to include JMS, It seems not to work.

This is my Bitronix Transaction Manager configuration:

@Configuration
@EnableTransactionManagement
public class BitronixJtaConfiguration {

    private static final Logger log = LoggerFactory.getLogger(BitronixJtaConfiguration.class);

    @Value("${bitronix.tm.serverId}")
    private String serverId;

    @Value("${bitronix.tm.journal.disk.logPart1Filename:}")
    private String logPart1Filename;

    @Value("${bitronix.tm.journal.disk.logPart2Filename:}")
    private String logPart2Filename;

    @Bean
    public bitronix.tm.Configuration transactionManagerServices() {
        bitronix.tm.Configuration configuration = TransactionManagerServices.getConfiguration();
        configuration.setServerId(serverId);
        if ("".equals(logPart1Filename) && "".equals(logPart2Filename)) {
            configuration.setJournal(null);
            log.info("Disable journal for testing.");
        } else {
            configuration.setLogPart1Filename(logPart1Filename);
            configuration.setLogPart2Filename(logPart2Filename);
        }
        return configuration;
    }

    @Bean
    public TransactionManager transactionManager() {
        return TransactionManagerServices.getTransactionManager();
    }

    @Bean
    public UserTransaction userTransaction() {
        return TransactionManagerServices.getTransactionManager();
    }

    @Bean
    public PlatformTransactionManager platformTransactionManager() {
        UserTransaction userTransaction = userTransaction();
        TransactionManager transactionManager = transactionManager();
        return new JtaTransactionManager(userTransaction, transactionManager);
    }
}

This is one of my database configuration class:

@Configuration
@EnableTransactionManagement
public class TransportationPlanDBConfig {

  private static final Logger LOGGER = LoggerFactory.getLogger("ppalfile");

  @Value("${tp.jdbc.driverClassName}")
  private String driverClassName;

  @Value("${tp.jdbc.username}")
  private String username;

  @Value("${tp.jdbc.url}")
  private String url;

  @Value("${tp.jdbc.password}")
  private String password;

  @Value("${tp.c3p0.max_size}")
  private int c3p0MaxSize;

  @Value("${tp.c3p0.min_size}")
  private int c3p0MinSize;

  @Value("${tp.c3p0.unreturned_connection_timeout}")
  private int c3p0UnreturnedConnectionTimeout;

  @Value("${tp.c3p0.acquire_increment}")
  private int c3p0AcquireIncrement;

  @Value("${tp.c3p0.max_idle_time}")
  private int c3p0MaxIdleTime;

  public TransportationPlanDBConfig() {
    // Empty constructor
  }

  @Bean(name = "tpds", destroyMethod = "close")
  public DataSource dataSource() {
    LOGGER.debug("Creating Transportation plan DS");
    PoolingDataSource poolingDataSource = new PoolingDataSource();
    poolingDataSource.setClassName(driverClassName);
    poolingDataSource.setUniqueName("tpds");
    Properties props = new Properties();
    props.put("url", url);
    props.put("user", username);
    props.put("password", password);
    poolingDataSource.setDriverProperties(props);
    poolingDataSource.setAllowLocalTransactions(true);
    poolingDataSource.setMaxPoolSize(c3p0MaxSize);
    poolingDataSource.init();
    return poolingDataSource;
  }

  @Bean(name = "tpJdbcTemplate")
  JdbcTemplate jdbcTemplate(@Qualifier("tpds") DataSource dataSource) {
    LOGGER.debug("Creating JdbcTemplate transport plan");
    JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
    LOGGER.debug(" JdbcTemplate Transport Plan created ");
    return jdbcTemplate;
  }

}

My ActiveMQ configuration class:

@Configuration
@EnableTransactionManagement
public class ActivesMQsConfiguration {

    @Bean
    public ConnectionFactory jmsConnectionFactoryLocal() {
        PoolingConnectionFactory btmPoolingConnectionFactory = new PoolingConnectionFactory();
        btmPoolingConnectionFactory.setClassName("org.apache.activemq.ActiveMQXAConnectionFactory");
        btmPoolingConnectionFactory.setUniqueName("AMQLocal");
        btmPoolingConnectionFactory.setMinPoolSize(1);
        btmPoolingConnectionFactory.setMaxPoolSize(5);
        btmPoolingConnectionFactory.setAllowLocalTransactions(true);
        btmPoolingConnectionFactory.setUser("admin");
        btmPoolingConnectionFactory.setPassword("admin");
        btmPoolingConnectionFactory.getDriverProperties().setProperty("brokerURL", "tcp://localhost:61616");
        btmPoolingConnectionFactory.init();
        return btmPoolingConnectionFactory;
    }



    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactoryLocal(
            @Qualifier("jmsConnectionFactoryLocal") ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(true);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

My JMS Listener implemetation:

@Component
@Transactional
public class ContactTransactionReceiver {


    private int mensajesConsumer2 = 0;

    @Autowired
    @Qualifier("versionJdbcTemplate")
    private JdbcTemplate versionJdbcTemplate;

    @Autowired
    @Qualifier("tpJdbcTemplate")
    private JdbcTemplate tpjdbcTemplate;

    @Autowired
    private VersionsConfDao versionsConfDao;

    @Autowired
    private TrainDao trainDao;


    @Transactional(rollbackFor=Exception.class)
    @JmsListener(destination = "Consumer.consumer2.VirtualTopic.TopicPrueba")
    public void receiveMessageFromContacts2(Message message) throws Exception {
        mensajesConsumer2++;
        TextMessage txtMessage = (TextMessage) message;
        System.out.println("Segundo consumer:" + txtMessage.getText() + " recibidos:" + mensajesConsumer2);


        VersionsConf versionsconf = new VersionsConf("V" + mensajesConsumer2, "V" + mensajesConsumer2, false,new Timestamp(1L), 1);
        VersionsConf versionsResult = versionsConfDao.insertUpdate(versionJdbcTemplate, versionsconf);

        if (mensajesConsumer2 == 2) {
            throw new Exception();
        }

        Train train = new Train("101"+mensajesConsumer2, 1L, 2L, false, true, "atp");
        Train trainResult = trainDao.insertUpdate(tpjdbcTemplate, train);

        if (mensajesConsumer2 == 3) {
            throw new Exception();
        }
    }

}

Based on my listener implementation, as I understood Bitronix functionality:

  1. On first incoming message: Must insert one row in each database and dequeue the message. -> This works fine.

  2. On second and third incoming message: Must insert 0 rows due to the exception and keep the message in the queue. -> No rows inserted but the message is dequeued.

Moreover, I'd like to add that It logs the following during the execution: [main] bitronix.tm.recovery.Recoverer: recovery committed 0 dangling transaction(s) and rolled back 0 aborted transaction(s) on 4 resource(s) [AMQLocal, vds, AMQRemote, tpds]

So, I understood that both brokers and both data bases are registered. But when the listener process the second message (It throws an exception), and It logs:

WARN 5740 [Session Task-1] bitronix.tm.twopc.Preparer : executing transaction with 0 enlisted resource

Any idea about the problem??

You can find the full code on: https://github.com/PedroRamirezTOR/spring-jms-jdbc-integration.git

Thanks!


Solution

  • First, the recovery committed 0 dangling transaction(s) and rolled back 0 aborted transaction(s) on 4 resource(s) message is going to appear every now and then and it is perfectly normal. You can ignore it as long as the committed and rolled back counters are at zero.

    The executing transaction with 0 enlisted resource log looks like the real deal.

    I highly suspect a problem with your Spring setup. I'm no Spring expert by any mean, but the DefaultJmsListenerContainerFactory should have a reference to your Spring PlatformTransactionManager instance, so that it knows it has to work transactionally, so you should call factory.setTransactionManager(PlatformTransactionManager).

    This should at least move you to the next step.