Search code examples
javaconcurrencyhsqldbjdbi

How can data be leaking between these 2 concurrent transactions?


I've got a project running on java 8, spring boot 1.5.9, hsqldb 2.4.0, jdbi3-core 3.0.0. I'm writing tests to verify database behavior. One set of tests checks concurrency behavior.

My most basic test is somehow leaking data between 2 transactions, and I can't figure out why. I don't know if my test is flawed, jdbi is bugged, or hqsldb is fundamentally broken.

Connection url jdbc:hsqldb:${...};create=true;hsqldb.tx=mvlocks;hsqldb.tx_level=serializable

Note that tx=mvlocks and tx_level=serializable. Setting tx=mvcc so that tx_level becomes snapshot isolation doesn't change the outcome, which is even weirder since snapshots taken at the same time before any changes should definitely not be able to affect each other.

Test explanation:

  1. create a table with 1 record
  2. start 2 threads
  3. in each, start a new transaction at the same time (ensured by CyclicBarrier syncing, verified by logging output) so that neither tx should be able to see the other's changes
  4. in thread 1, insert a new row (total 2) and commit
  5. in thread 2, count the number of rows

My expectation is that thread 2's transaction should only have 1 record in the table, since thread 2's transaction was started before thread 1's transaction was committed and the isolation level is set to serializable.

    CyclicBarrier syncLock = new CyclicBarrier(2);
    Runnable sync = Unchecked.runnable(() -> syncLock.await(1, TimeUnit.SECONDS));

    jdbi.useTransaction(tx -> {
        Queries queries = tx.attach(Queries.class);
        queries.create();
        // inserts a row with id 1
        queries.insert(1, 1);
    });

    CompletableFuture<Void> first = CompletableFuture.runAsync(Unchecked.runnable(() -> {
        jdbi.useTransaction(tx -> {
            assertThat(tx.isInTransaction()).isTrue();
            assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

            LOGGER.info("first tx started");
            sync.run();

            Queries queries = tx.attach(Queries.class);
            queries.insert(2, 2);
        });
        LOGGER.info("first tx committed");

        Thread.sleep(100);

        sync.run();
    }));

    CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(Unchecked.supplier(() -> {
        int out = jdbi.inTransaction(tx -> {
            assertThat(tx.isInTransaction()).isTrue();
            assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

            LOGGER.info("second tx started");
            sync.run();
            sync.run();

            Queries queries = tx.attach(Queries.class);
            // counts the number of rows
            return queries.count();
        });
        LOGGER.info("second tx committed");

        return out;
    }));

    // capture exceptions from either thread
    CompletableFuture.allOf(first, subject).get();

    assertThat(subject.get()).isEqualTo(1);

Output:

01:28:16.255 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query create table test(id int primary key, foo int) with arguments { positional:{}, named:{}, finder:[]}
01:28:16.257 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:1,1:1}, named:{foo:1,id:1}, finder:[]}
01:28:16.313 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx started
01:28:16.313 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx started
01:28:16.315 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-1}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:2,1:2}, named:{foo:2,id:2}, finder:[]}
01:28:16.315 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx committed
01:28:16.317 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-2}: took <1ms to execute query select count(*) from test with arguments { positional:{}, named:{}, finder:[]}
01:28:16.318 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx committed

org.junit.ComparisonFailure: 
Expected :1
Actual   :2

This basic jdbc test does not show this "read committed"-like behavior. The test is green, proving that serializable transactions started at the same time cannot see each other's changes even after committing:

@Test
public void transactionsAreIsolated() throws SQLException {
    @Cleanup
    Connection connection = dataSource.getConnection();
    Statement statement = connection.createStatement();
    statement.execute(CREATE_TABLE);

    @Cleanup
    Connection c1 = dataSource.getConnection();
    @Cleanup
    Connection c2 = dataSource.getConnection();

    c1.setAutoCommit(false);
    c2.setAutoCommit(false);

    startTransaction(c1);
    startTransaction(c2);

    assertThat(count(c1)).isEqualTo(0);
    assertThat(count(c2)).isEqualTo(0);

    insert(c1, 1);
    assertThat(count(c1)).isEqualTo(1);
    assertThat(count(c2))
        .describedAs("read uncommitted")
        .isEqualTo(0);

    c1.commit();
    assertThat(count(c1)).isEqualTo(1);
    assertThat(count(c2))
        .describedAs("read committed")
        .isEqualTo(0);
}

Solution

  • I reproduced your JDBC test in Jdbi and it works:

    @Test
    public void transactionsAreIsolated() {
      try (Handle h1 = jdbi.open();
           Handle h2 = jdbi.open()) {
        h1.begin();
        h2.begin();
    
        assertThat(count(h1)).isEqualTo(0); 
        assertThat(count(h2)).isEqualTo(0); // locks h2's txn to the current snapshot
    
        insert(h1, 1, 1);
    
        assertThat(count(h1)).isEqualTo(1);
        assertThat(count(h2)).describedAs("read uncommitted").isEqualTo(0);
    
        h1.commit();
    
        assertThat(count(h1)).isEqualTo(1);
        assertThat(count(h2)).describedAs("read committed").isEqualTo(0);
    
        h2.rollback();
      }
    }
    

    From testing it appears that transactions do not actually lock to a database snapshot until you actually interact with the database within the transaction.

    In the above test, we observe the row count through h2 before inserting the row through h1. This interaction sets up a transaction snapshot, which is why your JDBC test worked.

    However if we modify the above test to conclude the h1 transaction before observing the count on h2:

    @Test
    public void transactionsLockToStateWhenObserved() {
      try (Handle h1 = jdbi.open();
           Handle h2 = jdbi.open()) {
        h1.begin();
        h2.begin();
    
        insert(h1, 1, 1);
    
        assertThat(count(h1)).isEqualTo(1);
    
        h1.commit();
    
        assertThat(count(h2))
            .describedAs("_now_ we're locked to a snapshot")
            .isEqualTo(1);
    
        h2.rollback();
      }
    }
    

    Your original test has two sync points (transactions started, transaction 1 committed), but it needs four to fully test your scenario:

    @Test
    public void concurrentTransactionsAreIsolated() throws Exception {
      CyclicBarrier barrier = new CyclicBarrier(2);
      Runnable sync = uncheckedRunnable(() -> barrier.await(1, TimeUnit.SECONDS));
    
      jdbi.useTransaction(handle -> insert(handle, 1, 1));
    
      CompletableFuture<Void> first = CompletableFuture.runAsync(uncheckedRunnable(() -> {
        jdbi.useTransaction(tx -> {
          assertThat(tx.isInTransaction()).isTrue();
          assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
    
          log.info("first tx started");
          sync.run(); // wait for both transactions to start
    
          insert(tx, 2, 2);
    
          log.info("first tx inserted row");
          sync.run(); // let the second txn check uncommitted reads
          sync.run(); // wait for second txn to check the uncommitted reads
        });
    
        log.info("first tx committed");
        sync.run(); // transaction closed, let second transaction check committed reads
      }));
    
      CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(uncheckedSupplier(() -> {
        int out = jdbi.inTransaction(tx -> {
          assertThat(tx.isInTransaction()).isTrue();
          assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
    
          log.info("second tx started");
          sync.run(); // wait for both transactions to start
          sync.run(); // wait for first txn to insert
    
          log.info("second tx checking uncommitted read");
          assertThat(count(tx)).isEqualTo(1);
    
          sync.run(); // let the first txn commit
          sync.run(); // wait for first txn to commit
    
          log.info("second tx checking committed read");
          return count(tx);
        });
        log.info("second tx committed");
    
        return out;
      }));
    
      // capture exceptions from either thread
      CompletableFuture.allOf(first, subject).get();
    
      assertThat(subject.get()).isEqualTo(1);
    }