Search code examples
postgresqlrustrust-tokiorust-sqlx

What is the idiomatic approach for multithreaded postgres operations using sqlx in Rust?


I'm attempting to use threads to offload the insertion of three different data types into a Postgres database, but are getting stuck on the borrow checker.

I've attempted a few different approaches, and ended up in a place where i did not know what I was doing anymore. So I've tried to simplify everything, removed all the attributes of my dbm struct and set it so that each method takes a reference to PgPool as an argument and creating clones of the initial PgPool for each of the method calls. All the methods take a &self argument. I've also tried to make copies of dbm for each of the operations which did not work (in addition to feeling extraordinarily clumsy).

I expected this to work, either as shown in the code sample here or in the variant where i used a reference of dbm or a cloned version. Instead I get the following message from the borrow checker, and while understanding it, I am not sure how to best accommodate it.

76 |           let aton_handle = task::spawn(dbm.insert_aton_data(
   |  _______________________________________^
77 | |             connection_pool.clone(),
78 | |             split_messages.aton_data,
79 | |             &log_id,
80 | |         ));
   | |         ^
   | |         |
   | |_________borrowed value does not live long enough
   |           argument requires that `dbm` is borrowed for `'static`
...

Code snippet that show the offending code:

let connection_pool = PgPool::connect(&connection_string)
        .await
        .expect("Failed to connect to Postgres");
    let dbm = DbMethods {};

    // Make API calls etc..


    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        // TODO: Create thread for each message type and do DB inserts.
        let aton_handle = task::spawn(dbm.insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            &log_id,
        ));
        // ... other handles.

        let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
    }

Method:

pub async fn insert_aton_data(
        &self,
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: &Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        // let pool = PgPool::connect(&self.connection_string).await?;
        let tx = db_pool.begin().await?;

        for data in aton_data {
            sqlx::query!(
                "INSERT INTO ais.ais_aton_data (
                    type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
                    type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
                data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
                data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
                data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
            ).execute(&db_pool).await?;
        }
        tx.commit().await?;

        Ok(())
    }

Current solution

I decided to move away from the pattern of having the database operations as methods on a struct and instead have them as functions in order to keep moving forward. The threading now works. Hope someone can explain to me how I can achieve having them as methods.

Here is the code I ended up with for now in case anyone else would want to do something like this. Not saying that it is a good idea, it might be well worth having a look at using sqlx and Postgres UNNESTING for bulk inserting.

async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        let aton_handle = task::spawn(insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            log_id.clone(),
        ));
        let static_handle = task::spawn(insert_static_data(
            connection_pool.clone(),
            split_messages.static_data,
            log_id.clone(),
        ));
        let position_handle = task::spawn(insert_position_data(
            connection_pool.clone(),
            split_messages.position_data,
            log_id.clone(),
        ));

        let res = tokio::try_join!(aton_handle, static_handle, position_handle);
        match res {
            Ok(..) => {
                debug!("Threads completed");
            }
            Err(error) => warn!("There was an error in one of the threads: {:?}", error)
        }
    }

    Ok(())
}

Solution

  • If just the method syntax is what you're after then you can get around the compiler error by letting the compiler do constant promotion just add a borrow where you declare dbm:

    let dbm = &Dbm {};
    

    or implement Copy for Dbm and change the receiver from &self to self.

    But I'd really advise against using a method for it's sake, you can instead group your functions into a module and call them with the modules name for a small change from . to :::

    mod dbm {
        pub async fn insert_aton_data(
            db_pool: PgPool,
            aton_data: Vec<AISAtonData>,
            log_id: Uuid,
        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 
            //…
        }
    }
    
    // and later call it like
    let handle = task::spawn(dbm::insert_aton_data(
        connection_pool.clone(),
        split_messages.aton_data,
        log_id.clone(),
    ));
    

    Note: I also changed from &Uuid to Uuid as that's likely to cause the same problem

    Another note: If you declare Dbm as struct Dbm; you don't have to add the {} when creating an instance of it.