Search code examples
rustserde-json

How can I stream elements from inside a JSON array using serde_json?


I have a 5GB JSON file which is an array of objects with fixed structure:

[
  {
    "first": "John",
    "last": "Doe",
    "email": "[email protected]"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "[email protected]"
  },
  ....
]

I know that I can try to parse this file using the code shown in How can I deserialize JSON with a top-level array using Serde?:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let users: Vec<User> = serde_json::from_str(file)?;

There are multiple problems:

  1. It is first read as a string as a whole
  2. After reading as string, it converts it into a vector of User structs (I don't want that)

I tried How I can I lazily read multiple JSON values from a file/stream in Rust? but it reads the whole file before printing anything and it prints the whole structure at once inside the loop. I was expecting one object at a time in the loop:

enter image description here

Ideally, parsing and processing of the (parsed) User object should happen simultaneously in two separate threads/tasks/routines or by making use of channel.


Solution

  • Streaming elements from a JSON array is possible, but requires some legwork. You must skip the leading [ and the intermittent , yourself, as well as detect the final ]. To parse individual array elements you need to use StreamDeserializer and extract a single item from it (so you can drop it and regain control of the IO reader). For example:

    use serde::de::DeserializeOwned;
    use serde_json::{self, Deserializer};
    use std::io::{self, Read};
    
    fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
        loop {
            let mut byte = 0u8;
            reader.read_exact(std::slice::from_mut(&mut byte))?;
            if !byte.is_ascii_whitespace() {
                return Ok(byte);
            }
        }
    }
    
    fn invalid_data(msg: &str) -> io::Error {
        io::Error::new(io::ErrorKind::InvalidData, msg)
    }
    
    fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
        let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
        match next_obj {
            Some(result) => result.map_err(Into::into),
            None => Err(invalid_data("premature EOF")),
        }
    }
    
    fn yield_next_obj<T: DeserializeOwned, R: Read>(
        mut reader: R,
        at_start: &mut bool,
    ) -> io::Result<Option<T>> {
        if !*at_start {
            *at_start = true;
            if read_skipping_ws(&mut reader)? == b'[' {
                // read the next char to see if the array is empty
                let peek = read_skipping_ws(&mut reader)?;
                if peek == b']' {
                    Ok(None)
                } else {
                    deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
                }
            } else {
                Err(invalid_data("`[` not found"))
            }
        } else {
            match read_skipping_ws(&mut reader)? {
                b',' => deserialize_single(reader).map(Some),
                b']' => Ok(None),
                _ => Err(invalid_data("`,` or `]` not found")),
            }
        }
    }
    
    pub fn iter_json_array<T: DeserializeOwned, R: Read>(
        mut reader: R,
    ) -> impl Iterator<Item = Result<T, io::Error>> {
        let mut at_start = false;
        std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
    }
    

    Example usage:

    fn main() {
        let data = r#"[
      {
        "first": "John",
        "last": "Doe",
        "email": "[email protected]"
      },
      {
        "first": "Anne",
        "last": "Ortha",
        "email": "[email protected]"
      }
    ]"#;
        use serde::{Deserialize, Serialize};
    
        #[derive(Serialize, Deserialize, Debug)]
        struct User {
            first: String,
            last: String,
            email: String,
        }
    
        for user in iter_json_array(io::Cursor::new(&data)) {
            let user: User = user.unwrap();
            println!("{:?}", user);
        }
    }
    

    Playground

    When using it in production, you'd open it as File instead of reading it to a string. As always, don't forget to wrap the File in a BufReader.