Search code examples
rustiteratorrust-futures

Stepping by n for futures::stream::iter()


I am creating a futures::stream::iter() in order to stream responses from a paginated API. However, I want to be able to increment the stream::iter idx by 10 each time. I receive an error that stream::iter() is not an iterator, so I am trying to figure out how to perform this behavior.

error[E0599]: `futures::stream::Iter<RangeFrom<{integer}>>` is not an iterator
  --> src/main.rs:21:14
   |
20 |           let _ = futures::stream::iter(1..)
   |  _________________-
21 | |             .step_by(10)
   | |             -^^^^^^^ `futures::stream::Iter<RangeFrom<{integer}>>` is not an iterator
   | |_____________|
   |

Code is below

use futures::stream::{self as stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Post {
    pub user_id: u16,
    pub id: u16,
    pub title: String,
    pub body: String,
}

#[tokio::main]
async fn main() {
    // create iterator that will stream async responses
    let client = reqwest::Client::new();
    let (tx, mut rx) = mpsc::channel::<Post>(2);
    tokio::spawn(async move {
        let _ = stream::iter()
            .step_by(10)
            // make the request
            .then(|i| {
                let client = &client;
                let url = format!("https://jsonplaceholder.typicode.com/posts/{i}");
                client.get(url).send()
            })
            // deserialize
            .and_then(|resp| resp.json())
            .try_for_each_concurrent(2, |r| async {
                let tx_cloned = tx.clone();
                let _ = tx_cloned.send(r).await;
                Ok(())
            })
            .await;
    });
    // consume responses from our channel to do future things with results...
    while let Some(r) = rx.recv().await {
        println!("{:?}", r);
    }
}

Solution

  • stream::iter((1..).step_by(10))
    

    does the trick