Search code examples
rustrust-tokio

Cannot borrow data in an `Arc` as mutable


I don't know what to do next. It looks like I misunderstand something, or maybe I have not learned some critical topic.

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}
error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/lib.rs:69:23
   |
69 |                 match state.res_rx.recv().await {
   |                       ^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<State>`

Solution

  • Lucas Zanella's answer and Shepmaster's comments helped alot to refactor and simplify code. I've desided to pass ownership inside Proxy::new() function instead of using shared reference. The code became more readable, and I've avoided shared reference for mutable tokio::sync::mpsc::Receiver. Perhaps the question turned out to be too unstructured, but I came to a new approach thanks to the community. Refactored code is listed below.

    use reqwest::{Client, Error, Response};
    use tokio::sync::mpsc;
    use tokio::sync::mpsc::{Sender, Receiver};
    
    
    pub struct Task {
        pub id: u32,
        pub url:  String,
    }
    pub enum Message{
        Failure(Task, Error),
        Success(Task, Response),
    }
    pub struct Proxy{
        id: u32,
        max_rps: u16,
        max_pending: u16,
        in_tx: Sender<Task>,
    }
    
    
    async fn send_msg<T>(tx: &Sender<T>, msg: T){
        match tx.send(msg).await {
            Err(error) => { eprintln!("{}", error) },
            _ => (),
        };
    }
    
    
    async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
        loop {
            if let Some(task) = in_rx.recv().await {
                let client_clone = client.clone();
                let res_tx_clone = res_tx.clone();
                tokio::spawn(async move {
                    println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
                    match client_clone.get(&task.url).send().await {
                        Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
                        Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
                    }
                });
            }
        }
    }
    
    
    async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
        loop {
            if let Some(message) = res_rx.recv().await {
                match message {
                    Message::Success(task, res) => { 
                        send_msg(
                            &out_tx, 
                            format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
                        ).await;
                    },
                    Message::Failure(task, err) => {
                        send_msg(&out_tx, err.to_string()).await;
                    },
                }
            }
        }
    }
    
    
    impl Proxy{
    
        pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
            
            let mut client = Client::builder();
            if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
            let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size
    
            let client = client.build()?;
            let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
            let res_tx_clone = res_tx.clone();
            tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });
    
            tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
            
            Ok(Proxy{
                id,
                max_rps,
                max_pending,
                in_tx,
            })
        }
    
        pub fn get_in_tx(&self) -> Sender<Task> {
            self.in_tx.clone()
        }
    }