I'm attempting to use threads to offload the insertion of three different data types into a Postgres database, but are getting stuck on the borrow checker.
I've attempted a few different approaches, and ended up in a place where i did not know what I was doing anymore. So I've tried to simplify everything, removed all the attributes of my dbm struct and set it so that each method takes a reference to PgPool as an argument and creating clones of the initial PgPool for each of the method calls. All the methods take a &self argument. I've also tried to make copies of dbm for each of the operations which did not work (in addition to feeling extraordinarily clumsy).
I expected this to work, either as shown in the code sample here or in the variant where i used a reference of dbm or a cloned version. Instead I get the following message from the borrow checker, and while understanding it, I am not sure how to best accommodate it.
76 | let aton_handle = task::spawn(dbm.insert_aton_data(
| _______________________________________^
77 | | connection_pool.clone(),
78 | | split_messages.aton_data,
79 | | &log_id,
80 | | ));
| | ^
| | |
| |_________borrowed value does not live long enough
| argument requires that `dbm` is borrowed for `'static`
...
Code snippet that show the offending code:
let connection_pool = PgPool::connect(&connection_string)
.await
.expect("Failed to connect to Postgres");
let dbm = DbMethods {};
// Make API calls etc..
if let Some(messages) = last_hour.ais_response.ais_latest_responses {
// TODO: Handle errors.
let split_messages = process_ais_items(messages).unwrap();
// TODO: Create thread for each message type and do DB inserts.
let aton_handle = task::spawn(dbm.insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
&log_id,
));
// ... other handles.
let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
}
Method:
pub async fn insert_aton_data(
&self,
db_pool: PgPool,
aton_data: Vec<AISAtonData>,
log_id: &Uuid,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// let pool = PgPool::connect(&self.connection_string).await?;
let tx = db_pool.begin().await?;
for data in aton_data {
sqlx::query!(
"INSERT INTO ais.ais_aton_data (
type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
).execute(&db_pool).await?;
}
tx.commit().await?;
Ok(())
}
I decided to move away from the pattern of having the database operations as methods on a struct and instead have them as functions in order to keep moving forward. The threading now works. Hope someone can explain to me how I can achieve having them as methods.
Here is the code I ended up with for now in case anyone else would want to do something like this. Not saying that it is a good idea, it might be well worth having a look at using sqlx and Postgres UNNESTING for bulk inserting.
async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
if let Some(messages) = last_hour.ais_response.ais_latest_responses {
// TODO: Handle errors.
let split_messages = process_ais_items(messages).unwrap();
let aton_handle = task::spawn(insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
log_id.clone(),
));
let static_handle = task::spawn(insert_static_data(
connection_pool.clone(),
split_messages.static_data,
log_id.clone(),
));
let position_handle = task::spawn(insert_position_data(
connection_pool.clone(),
split_messages.position_data,
log_id.clone(),
));
let res = tokio::try_join!(aton_handle, static_handle, position_handle);
match res {
Ok(..) => {
debug!("Threads completed");
}
Err(error) => warn!("There was an error in one of the threads: {:?}", error)
}
}
Ok(())
}
If just the method syntax is what you're after then you can get around the compiler error by letting the compiler do constant promotion just add a borrow where you declare dbm
:
let dbm = &Dbm {};
or implement Copy
for Dbm
and change the receiver from &self
to self
.
But I'd really advise against using a method for it's sake, you can instead group your functions into a module and call them with the modules name for a small change from .
to ::
:
mod dbm {
pub async fn insert_aton_data(
db_pool: PgPool,
aton_data: Vec<AISAtonData>,
log_id: Uuid,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
//…
}
}
// and later call it like
let handle = task::spawn(dbm::insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
log_id.clone(),
));
Note: I also changed from &Uuid
to Uuid
as that's likely to cause the same problem
Another note: If you declare Dbm
as struct Dbm;
you don't have to add the {}
when creating an instance of it.