Search code examples
rustrust-tokio

How to call a poll_* method?


I am trying to implement a cache in Rust using DelayQueue based on an this example provided in the tokio_util documentation. This is my code:

# Cargo.toml
[dependencies]
futures = "0.3"
tokio-util = { version = "0.7.3", features = ["full"] }
tokio = { version = "1.19.2", features = ["full"] }
use tokio_util::time::{delay_queue, DelayQueue};

use futures::ready;
use std::collections::HashMap;
use std::task::{Context, Poll};
use std::time::Duration;

type CacheKey = String;
type Value = String;

struct Cache {
    entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
    expirations: DelayQueue<CacheKey>,
}

const TTL_SECS: u64 = 10;

impl Cache {
    fn new() -> Cache {
        Cache {
            entries: HashMap::new(),
            expirations: DelayQueue::new(),
        }
    }

    fn insert(&mut self, key: CacheKey, value: Value) {
        let delay = self
            .expirations
            .insert(key.clone(), Duration::from_secs(TTL_SECS));

        self.entries.insert(key, (value, delay));
    }

    fn _get(&self, key: &CacheKey) -> Option<&Value> {
        self.entries.get(key).map(|&(ref v, _)| v)
    }

    fn remove(&mut self, key: &CacheKey) {
        if let Some((_, cache_key)) = self.entries.remove(key) {
            self.expirations.remove(&cache_key);
        }
    }

    fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
            self.entries.remove(entry.get_ref());
        }

        Poll::Ready(())
    }
}

#[tokio::main]
async fn main() {
    let mut cache = Cache::new();
    cache.insert("k1".to_string(), "v1".to_string());
    cache.insert("k2".to_string(), "v2".to_string());
    cache.insert("k3".to_string(), "v3".to_string());
    cache.insert("k4".to_string(), "v4".to_string());
    cache.insert("k5".to_string(), "v5".to_string());

    // code for consuming from the queue
}

I want to call poll_purge() method on my cache object but it has a parameter that I don't know what to pass:

cx: &mut Context<'_>

How should I change my code?


Solution

  • You can add an async fn purge to the Cache impl and use poll_fn from the futures crate:

    async fn purge(&mut self) {
        futures::future::poll_fn(|cx| self.poll_purge(cx)).await;
    }
    

    That will create a Future that wraps poll_purge and ensures it runs until completion.