Search code examples
multithreadingrustthreadpoolgmail-api

How do i pass a google_gmail1::Gmail<HttpsConnector<HttpConnector>> to a thread spawn?


Very much new to Rust... I am looking to download email attachments from Gmail using a specified query, I have a rough synchronous code that functions and i am looking to use multi threading using ?Tokio::spawn? (since the rest of the project is already using tokio) the current multi-threaded? code is as follows:

struct EmailInterface {
    pub gmail: Gmail<HttpsConnector<HttpConnector>>,
}

impl EmailInterface {
    async fn new(path_to_key: String) -> Result<EmailInterface, Box<dyn error::Error>> {
        let authenticator =
            ServiceAccountAuthenticator::builder(read_service_account_key(path_to_key).await?)
                .build()
                .await?;

        let gmail = Gmail::new(
            hyper::Client::builder().build(
                hyper_rustls::HttpsConnectorBuilder::new()
                    .with_native_roots()
                    .https_or_http()
                    .enable_http1()
                    .enable_http2()
                    .build(),
            ),
            authenticator,
        );

        Ok(EmailInterface { gmail })
    }

    pub async fn get_attachments(
        self,
        query: &str,
    ) -> Result<(), Box<dyn std::error::Error>> {
        if let Some(messages_list) = self
            .gmail
            .users()
            .messages_list("me")
            .q(query)
            .include_spam_trash(false)
            .doit()
            .await?
            .1
            .messages
        {
            let messages_id_list: Vec<String> = messages_list
                .into_iter()
                .flat_map(|message| message.id)
                .collect();

            for id in messages_id_list {
                tokio::spawn(async move {

                    let message = self.gmail
                        .users()
                        .messages_get("me", id.as_str())
                        .doit()
                        .await
                        .unwrap_or_default()
                        .1;

                    let message = self
                        .gmail
                        .users()
                        .messages_get("me", id.as_str())
                        .doit()
                        .await
                        .unwrap_or_default()
                        .1;
                    for part in message.payload {
                        if let Some(part_body) = part.body {
                            if let Some(attachment_id) = part_body.attachment_id {
                                let attachment = self
                                    .gmail
                                    .users()
                                    .messages_attachments_get(
                                        "me",
                                        id.as_str(),
                                        attachment_id.as_str(),
                                    )
                                    .doit()
                                    .await
                                    .unwrap_or_default();
                                let data = general_purpose::STANDARD
                                    .decode(&attachment.1.data.unwrap_or_default())
                                    .unwrap();
                                std::fs::write(part.filename.expect("Should have a name"), &data)
                                    .unwrap();
                            }
                        }
                    }
                });
            }
        }
        Ok(())
    }
}

the error i am currently getting

use of moved value: `self.gmail`
move occurs because `self.gmail` has type `Gmail<HttpsConnector<HttpConnector>>`, which does not implement the `Copy` trait

heck if there is a more streamlined way of doing this, I'm all ears (eyes?)

i have also tried using the ThreadPool crate & ChatGPT for suggestions but the latter always ends in error.


Solution

  • grumble grumble MRE grumble

    First, one recommendation I give nearly every rust beginner on SO: Don't rely on your IDE for good error messages. Read the output of cargo check/cargo watch, it is often so much more informative and helpful:

    error[E0382]: use of moved value: `self.gmail`
      --> src/main.rs:52:41
       |
    52 |                    tokio::spawn(async move {
       |   _________________________________________^
    53 |  |                     let message = self
       |  |___________________________________-
    54 | ||                         .gmail
       | ||______________________________- use occurs due to use in generator
    55 |  |                         .users()
    ...   |
    91 |  |                     }
    92 |  |                 });
       |  |_________________^ value moved here, in previous iteration of loop
       |
       = note: move occurs because `self.gmail` has type `Gmail<HttpsConnector<HttpConnector>>`, which does not implement the `Copy` trait
    

    In this case, it's telling you that it needs multiple self.gmail, because there's a for loop, but it only has one. What you need to know to avoid that is that if something doesn't implement Copy, it often still implements Clone. The difference is that the compiler is free to use Copy on its own to avoid moves, but you need to use Clone manually. In this case, you can use Clone to make a dedicated instance of the gmail client for each spawned future.

    for id in messages_id_list {
        let gmail = self.gmail.clone();
        tokio::spawn(async move {
            let message = gmail
                .users()
                .…
            // replace all the other instances of self.gmail as well
    

    This will make your code compile, but it won't work yet: tokio::spawn starts the tasks, but it doesn't ensure that they finish before your program exits:

    There is no guarantee that a spawned task will execute to completion. When a runtime is shutdown, all outstanding tasks are dropped, regardless of the lifecycle of that task.

    The "runtime" here is what you created when you annotated #[tokio::main], and its shutdown will happen when your main function exits.

    Also, if you have a lot of mail, I suspect that gmail will be mad at you for hitting it with thousands of requests at the same time.

    To fix both of these at once:

    use futures::{StreamExt, TryStreamExt};
    
    futures::stream::iter(&messages_id_list)
        .map(|id| async {
            let message = self
                .gmail
                .users()
            // …
            Ok::<_, String>(()) // You don't have error handling yet, so I'll use String as a dummy type.
        })
        .buffer_unordered(8)
        .try_collect::<()>()
        .await?;
    

    The buffer_unordered part makes sure that maximally a fixed number of message getters run, and the await ensures that it all finishes before the await on get_attachments finishes. Note how there's also no need to clone() or move anymore, because Rust now knows that self will outlive the message getters, so you can just borrow self from there.

    Lastly: I'd consider async one of the less nice parts of Rust to use. You picked something difficult to start with.