Search code examples
rustapache-arrowreqwest

How can I read a reqwest::Response object's bytes_stream() with an implementer of arrow_array::RecordBatchReader?


I am attempting to use the reqwest crate to stream binary data in arrow-IPC format from a REST API endpoint out of my control. The reqwest::Reponse object has a bytes_stream() method that returns a type implementing the trait Stream<Item = Result<Bytes>>. I'm hoping its possible to read this as a RecordBatch stream in some way, such as with arrow-ipc::reader::StreamReader or some other implementer of the arrow_array::RecordBatchReader trait. What's the best way to do this?


Solution

  • The arrow crate does not have support for async currently (although there is an open issue for that), but the alternative arrow2 does, and luckily it also defines a conversion layer between its types and arrow's types. So this is one option (the code is quite involved because arrow2's async support requires an AsyncRead, but what we have is a Stream giving Bytes):

    use std::io;
    use std::pin::Pin;
    use std::task::{ready, Context, Poll};
    
    use arrow2::datatypes::Schema;
    use arrow2::io::ipc::read::stream_async::AsyncStreamReader;
    use arrow2::io::ipc::read::StreamMetadata;
    use arrow2::io::ipc::IpcSchema;
    use arrow_format::ipc::MetadataVersion;
    use bytes::Bytes;
    use futures::stream::Fuse;
    use futures::{AsyncRead, Stream, StreamExt};
    use reqwest::Client;
    
    struct StreamAsAsyncRead<St> {
        stream: Fuse<St>,
        last: Option<Bytes>,
    }
    
    impl<St: Stream> StreamAsAsyncRead<St> {
        fn new(stream: St) -> Self {
            Self {
                stream: stream.fuse(),
                last: None,
            }
        }
    }
    
    impl<St: Stream<Item = reqwest::Result<Bytes>> + Unpin> AsyncRead for StreamAsAsyncRead<St> {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut [u8],
        ) -> Poll<io::Result<usize>> {
            let this = &mut *self;
            let data = match &mut this.last {
                Some(data) if !data.is_empty() => data,
                last => {
                    let Some(next_data) = ready!(this.stream.poll_next_unpin(cx)) else {
                        return Poll::Ready(Ok(0));
                    };
                    let next_data = next_data.map_err(|err| io::Error::other(err))?;
                    last.insert(next_data)
                }
            };
    
            let fill_len = std::cmp::min(buf.len(), data.len());
            buf[..fill_len].copy_from_slice(&data[..fill_len]);
            data.advance(fill_len);
            Poll::Ready(Ok(fill_len))
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let client = Client::new();
        let response = client.get("<address>").send().await?;
        let data_metadata = StreamMetadata {
            schema: Schema::default(),
            version: MetadataVersion::V5,
            ipc_schema: IpcSchema {
                fields: Vec::new(),
                is_little_endian: true,
            },
        };
        let mut stream = AsyncStreamReader::new(
            StreamAsAsyncRead::new(response.bytes_stream()),
            data_metadata,
        );
        while let Some(item) = stream.next().await {
            let item = item?;
            for item in item.into_arrays() {
                let item = arrow::array::ArrayRef::from(item);
                // Do something with `item`.
            }
        }
    
        Ok(())
    }
    

    Cargo.toml:

    [dependencies]
    arrow = "50.0.0"
    reqwest = { version = "0.11.24", features = ["stream"] }
    tokio = { version = "1.36.0", features = ["full"] }
    bytes = "1.5.0"
    futures = "0.3.30"
    arrow-format = "0.8.1"
    arrow2 = { version = "0.18.0", features = [
        "io_ipc",
        "io_ipc_read_async",
        "arrow",
    ] }
    

    Another, simpler option is to give up async support and just use reqwest's blocking interface:

    use arrow::ipc::reader::StreamReader;
    use reqwest::blocking::Client;
    
    fn main() -> Result<(), Box<dyn std::error::Error>> {
        let client = Client::new();
        let response = client.get("<address>").send()?;
        let reader = StreamReader::try_new(response, None)?;
        for item in reader {
            let item = item?;
            // Do something with `item`.
        }
        Ok(())
    }