Search code examples
asynchronousrustweb-crawlerrust-tokiorust-futures

How to concurrently crawl paginated webpages with unknown end?


I'm trying to write a web crawler in Rust using the tokio asynchronous runtime. I want to fetch/process multiple pages asynchronously but I also want the crawler to stop when it reaches the end (in other words if there is nothing left to crawl). So far I have used futures::future::try_join_all for getting a collective result from the async functions that I have provide as Futures but this obviously requires the program to know the total pages to crawl beforehand. For example:

async fn fetch(_url: String) -> Result<String, ()> {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    Ok(String::from("foo"))
}

#[tokio::main]
async fn main() {
    let search_url = "https://example.com/?page={page_num}";

    let futures = (1..=3)
        .map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
        .map(|url| fetch(url));

    let _ = futures::future::try_join_all(futures).await.unwrap();
}

Rust Playground

In this simple example I have to know the total pages to go through (1..=3) before actually fetching them. What I want is, not providing any range and have a condition to stop the whole process. (e.g. if the HTML result contains "not found")

I looked into futures::executor::block_on but I'm not sure if it is something that I can utilize for this task.


Solution

  • Here's roughly how to do this using Stream and .buffered():

    use futures::{future, stream, StreamExt};
    
    #[derive(Debug)]
    struct Error;
    
    async fn fetch_page(page: i32) -> Result<String, Error> {
        println!("fetching page: {}", page);
    
        // simulate loading pages
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        if page < 5 {
            // successfully got page
            Ok(String::from("foo"))
        } else {
            // page doesn't exist
            Err(Error)
        }
    }
    
    #[tokio::main]
    async fn main() {
        let pages: Vec<String> = stream::iter(1..)
            .map(fetch_page)
            .buffered(10)
            .take_while(|page| future::ready(page.is_ok()))
            .map(|page| page.unwrap())
            .collect()
            .await;
    
        println!("pages: {:?}", pages);
    }
    

    I'll go over the steps in main() in detail:

    Running the above code prints out the following, showing that it tries 10 at a time but will only return up to the first failure:

    fetching page: 1
    fetching page: 2
    fetching page: 3
    fetching page: 4
    fetching page: 5
    fetching page: 6
    fetching page: 7
    fetching page: 8
    fetching page: 9
    fetching page: 10
    pages: ["foo", "foo", "foo", "foo"]
    

    This glosses over some nice-to-haves like handling non-missing-page errors or retrying, but I hope this gives you a good foundation. In those cases you might reach for the methods on TryStreamExt, which specially handle streams of Results.