Search code examples
rustzeromqrust-tokio

Rust loop - borrowed value does not live long enough


In a rebroadcaster (redis to zmq) implementation i am unable to figure out how to resolve the lifetime issue of temporary values (derived from consumed message from pubsub in a loop).

I pretty much understand why, but i am so far failing to produce Vec<&String> that i need to pass into zmq publisher. I tried with different types, cloning etc. but i'm just getting same or similar errors related to ownership and lifetime.

The problematic code:

async fn ps_rebroadcaster() -> Result<(), ApiError>{

    let mut inproc_publisher: async_zmq::Publish<std::vec::IntoIter<&std::string::String>, &std::string::String>  = async_zmq::publish("ipc://pricing")?.bind()?;
    
    let con_str = &APP_CONFIG.REDIS_URL;
    let conn = create_client(con_str.to_string())
        .await
        .expect("Can't connect to redis");
    let mut pubsub = conn.get_async_connection().await?.into_pubsub();
    match pubsub.psubscribe("T1050_*").await {
        Ok(_) => {}
        Err(_) => {}
    };

    let mut msg_stream = pubsub.into_on_message();
     
    loop {
        let result = msg_stream.next().await;
        match result {
            Some(message) => {
                 
                let channel_name = message.get_channel_name().to_string();
                let payload_value: redis::Value = message.get_payload().expect("Can't get payload of message");
                let payload_string: String = FromRedisValue::from_redis_value(&payload_value).expect("Can't convert from Redis value");
                
                let item: Vec<&String> = vec![&channel_name , &payload_string  ];
                // problem here  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

                inproc_publisher.send(item.into()).await?;
            }
            None => {
                println!("None received");
                continue;
            }
        }
    }

    
}

And the error:

error[E0597]: `channel_name` does not live long enough
   --> src/server.rs:248:47
    |
248 |                 let item: Vec<&String> = vec![&channel_name , &payload_string  ];
    |                                               ^^^^^^^^^^^^^ borrowed value does not live long enough
249 |                 
250 |                 inproc_publisher.send(item.into()).await?;
    |                 ---------------------------------- borrow later used here
251 |             }
    |             - `channel_name` dropped here while still borrowed

error[E0597]: `payload_string` does not live long enough
   --> src/server.rs:248:63
    |
248 |                 let item: Vec<&String> = vec![&channel_name , &payload_string  ];
    |                                                               ^^^^^^^^^^^^^^^ borrowed value does not live long enough
249 |                 
250 |                 inproc_publisher.send(item.into()).await?;
    |                 ---------------------------------- borrow later used here
251 |             }
    |             - `payload_string` dropped here while still borrowed

Solution

  • This fails because the sink wants an Into<Message> but doesn't apparently perform the conversion immediately, so the reference needs to continue to exist for some unspecified amount of time, and the values they refer to are destroyed too soon. Basically, you need to give the sink an owned value instead of a borrowed one.

    You can do this by converting the Strings into Message values before giving them to the Publish sink, or by giving it an owned value that can be converted to Message. There isn't a conversion from String to Message, but there is one from Vec<u8>, and you can convert a String to a Vec<u8> without any copying by using the String::into_bytes() method.