Search code examples
testingrustconcurrencyrust-rocketrust-sqlx

How to test two parallel transactions in Rust SQLx?


I'm experimenting with Rocket, Rust and SQLx and I'd like to test what happens when two parallel transactions try to insert a duplicated record on my table.

My insert fn contains nothing special and it works fine:

async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
    EX: 'ex + Executor<'ex, Database = Postgres>,
{
    sqlx::query!(
        r#"INSERT INTO credentials (username, password)
        VALUES ($1, crypt($2, gen_salt('bf')))"#,
        credentials.username,
        credentials.password,
    )
    .execute(executor)
    .await
    .map(|result| result.rows_affected())
    .map_err(|err| err.into())
}

My test, though, hangs indefinitely since it waits for a commit that never happens:

#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
    let repo = new_repo();
    let db: Pool<Postgres> = connect().await;
    let credentials = new_random_credentials();

    println!("TX1 begins");
    let mut tx1 = db.begin().await.unwrap();
    let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);

    println!("TX2 begins");
    let mut tx2 = db.begin().await.unwrap();
    println!("It hangs on the next line");
    let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);
    
    println!("It never reaches this line");
    tx1.commit().await.unwrap();
    tx2.commit().await.unwrap();
}

How do I create and execute those TXs in parallel, such that the assertions pass but the test fails when trying to commit the second TX?

For reference, this is my Cargo.toml

[package]
name = "auth"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"

# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }

[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]

[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]

[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]

## DEV ##

[dev-dependencies]
mockall = "0.11.0"

[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]

Solution

  • You can use a async_std::future::timeout or tokio::time::timeout. Example using async_std:

    use async_std::future;
    use std::time::Duration;
    
    let max_duration = Duration::from_millis(100);
    assert!(timeout(max_duration, tx2.commit()).await.is_err());
    

    If you want to continue to tx2 before completing tx1, you can async_std::task::spawn or tokio::spawn the tx1 first:

    async_std::task::spawn(async move {
        assert!(tx1.commit().await.is_ok());
    });