I have some structs that are used to de-serialize requests from GCP alerting. The top-level structs implement FromResponse, and the nested structs all implement serde's Deserialize and Serialize traits.
Here are the structs (omitted ones that are filled by serde for brevity.)
#[derive(Debug, Clone, PartialEq)]
struct GcpAlert {
pub headers: GcpHeaders,
pub body: GcpBody,
}
#[async_trait]
impl FromRequest<Body> for GcpAlert {
type Rejection = StatusCode;
async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
let body = GcpBody::from_request(req).await?;
let headers = GcpHeaders::from_request(req).await?;
Ok(Self { headers, body })
}
}
#[derive(Debug, Clone)]
struct GcpHeaders {
pub host: TypedHeader<Host>,
pub content_length: TypedHeader<ContentLength>,
pub content_type: TypedHeader<ContentType>,
pub user_agent: TypedHeader<UserAgent>,
}
#[async_trait]
impl FromRequest<Body> for GcpHeaders {
type Rejection = StatusCode;
async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
let bad_req = StatusCode::BAD_REQUEST;
let host: TypedHeader<Host> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
let content_length: TypedHeader<ContentLength> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
let content_type: TypedHeader<ContentType> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
let user_agent: TypedHeader<UserAgent> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
Ok(Self {
host,
content_length,
content_type,
user_agent,
})
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
struct GcpBody {
pub incident: GcpIncident,
pub version: Box<str>,
}
#[async_trait]
impl FromRequest<Body> for GcpBody {
type Rejection = StatusCode;
async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
let serv_err = StatusCode::INTERNAL_SERVER_ERROR;
let bad_req = StatusCode::BAD_REQUEST;
let body = req.body_mut().as_mut().ok_or(serv_err)?;
let buffer = body::to_bytes(body).await.map_err(|_| serv_err)?;
Ok(serde_json::from_slice(&buffer).map_err(|_|bad_req)? )
}
}
To test this, I have created a test that compares a manually instantiated GcpAlert struct and one created via axum. Note that I've omitted details about the manually created struct and request, as I'm fairly certain they are unrelated.
#[tokio::test]
async fn test_request_deserialization() {
async fn handle_alert(alert: GcpAlert) {
let expected = GcpAlert {
headers: GcpHeaders { /* headers */ },
body: GcpBody { /* body */ }
};
assert_eq!(alert, expected);
}
let app = Router::new().route("/", post(handle_alert));
// TestClient is similar to this: https://github.com/tokio-rs/axum/blob/main/axum/src/test_helpers/test_client.rs
let client = TestClient::new(app);
client.post("/")
.header("host", "<host>")
.header("content-length", 1024)
.header("content-type", ContentType::json().to_string())
.header("user-agent", "<user-agent>")
.header("accept-enconding", "gzip, deflate, br")
.body(/* body */)
.send().await;
}
My issue is that the program freezes in the following line of GcpBody's FromRequest impl.
let buffer = body::to_bytes(body).await.map_err(|_| serv_err)?;
I've tried to debug the issue a little bit, but I'm not really familiar with assembly/llvm/etc. It looks like two threads are active for this. I can artificially increase the number of threads by using the multi-threading attribute on the test, but it doesn't change the end result, just a bigger call stack.
Thread1 callstack:
syscall (@syscall:12)
std::sys::unix::futex::futex_wait (@std::sys::unix::futex::futex_wait:64)
std::sys_common::thread_parker::futex::Parker::park_timeout (@std::thread::park_timeout:25)
std::thread::park_timeout (@std::thread::park_timeout:18)
std::sync::mpsc::blocking::WaitToken::wait_max_until (@std::sync::mpsc::blocking::WaitToken::wait_max_until:18)
std::sync::mpsc::shared::Packet<T>::recv (@std::sync::mpsc::shared::Packet<T>::recv:94)
std::sync::mpsc::Receiver<T>::recv_deadline (@test::run_tests:1771)
std::sync::mpsc::Receiver<T>::recv_timeout (@test::run_tests:1696)
test::run_tests (@test::run_tests:1524)
test::console::run_tests_console (@test::console::run_tests_console:290)
test::test_main (@test::test_main:102)
test::test_main_static (@test::test_main_static:34)
gcp_teams_alerts::main (/home/ak_lo/ドキュメント/Rust/gcp-teams-alerts/src/main.rs:1)
core::ops::function::FnOnce::call_once (@core::ops::function::FnOnce::call_once:6)
std::sys_common::backtrace::__rust_begin_short_backtrace (@std::sys_common::backtrace::__rust_begin_short_backtrace:6)
std::rt::lang_start::{{closure}} (@std::rt::lang_start::{{closure}}:7)
core::ops::function::impls::<impl core::ops::function::FnOnce<A> for &F>::call_once (@std::rt::lang_start_internal:242)
std::panicking::try::do_call (@std::rt::lang_start_internal:241)
std::panicking::try (@std::rt::lang_start_internal:241)
std::panic::catch_unwind (@std::rt::lang_start_internal:241)
std::rt::lang_start_internal::{{closure}} (@std::rt::lang_start_internal:241)
std::panicking::try::do_call (@std::rt::lang_start_internal:241)
std::panicking::try (@std::rt::lang_start_internal:241)
std::panic::catch_unwind (@std::rt::lang_start_internal:241)
std::rt::lang_start_internal (@std::rt::lang_start_internal:241)
std::rt::lang_start (@std::rt::lang_start:13)
main (@main:10)
___lldb_unnamed_symbol3139 (@___lldb_unnamed_symbol3139:29)
__libc_start_main (@__libc_start_main:43)
_start (@_start:15)
Thread2 callstack:
epoll_wait (@epoll_wait:27)
mio::sys::unix::selector::epoll::Selector::select (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/mio-0.8.4/src/sys/unix/selector/epoll.rs:68)
mio::poll::Poll::poll (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/mio-0.8.4/src/poll.rs:400)
tokio::runtime::io::Driver::turn (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/io/mod.rs:162)
<tokio::runtime::io::Driver as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/io/mod.rs:227)
<tokio::park::either::Either<A,B> as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/park/either.rs:30)
tokio::time::driver::Driver<P>::park_internal (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/time/driver/mod.rs:238)
<tokio::time::driver::Driver<P> as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/time/driver/mod.rs:436)
<tokio::park::either::Either<A,B> as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/park/either.rs:30)
<tokio::runtime::driver::Driver as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/driver.rs:198)
tokio::runtime::scheduler::current_thread::Context::park::{{closure}} (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:308)
tokio::runtime::scheduler::current_thread::Context::enter (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:349)
tokio::runtime::scheduler::current_thread::Context::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:307)
tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}} (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:554)
tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}} (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:595)
tokio::macros::scoped_tls::ScopedKey<T>::set (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/macros/scoped_tls.rs:61)
tokio::runtime::scheduler::current_thread::CoreGuard::enter (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:595)
tokio::runtime::scheduler::current_thread::CoreGuard::block_on (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:515)
tokio::runtime::scheduler::current_thread::CurrentThread::block_on (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:161)
tokio::runtime::Runtime::block_on (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/mod.rs:490)
While stepping through, I noticed that the Data struct is polled twice, and it seems like the context and state might not match up (it should close with nothing to return on the second poll, I've confirmed all data is returned in the first). Does anyone have any idea why the program continues to wait for new data when it certainly won't come?
Edit: Just to test I changed the line that causes the freeze to the following:
let mut body = BodyStream::from_request(req).await.map_err(|_| serv_err)?.take(1);
let buffer = {
let mut buf = Vec::new();
while let Some(chunk) = body.next().await {
let data = chunk.map_err(|_| serv_err)?;
buf.extend(data);
}
buf
};
The test is successful in this case. But if I increase take to anymore than 1, the same issue reoccurs.
I was wrong about my assumption of the request contents being unrelated. I got the content length incorrect when creating the test, and it looks like axum was waiting forever as a result.