Skip to content

Ensure body reads only return 0-length at the end #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion cli/tests/integration/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
common::{Test, TestResult},
viceroy_test,
},
hyper::{body, StatusCode},
hyper::{body, HeaderMap, Response, StatusCode},
};

viceroy_test!(bodies_can_be_written_and_appended, |is_component| {
Expand All @@ -32,3 +32,90 @@ viceroy_test!(bodies_can_be_written_and_read, |is_component| {
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});

viceroy_test!(zero_length_raw_chunks_are_transparent, |is_component| {
let resp = Test::using_fixture("expects-hello.wasm")
.adapt_component(is_component)
.async_backend(
"ReturnsHello",
"/",
None,
move |_req: hyper::Request<hyper::Body>| {
Box::new(async move {
// We'll "trickle back" our response.
let (mut write, read) = hyper::Body::channel();
// Assume a Tokio runtime for writing the response...
tokio::spawn(async move {
for chunk in ["", "hello", "", " ", "", "world", ""] {
let Ok(_) = write.send_data(chunk.into()).await else {
return;
};
tokio::task::yield_now().await;
}
let _ = write.send_trailers(HeaderMap::default());
});
Response::builder()
.status(StatusCode::OK)
.body(read)
.unwrap()
})
},
)
.await
.against_empty()
.await?;
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});

viceroy_test!(gzip_breaks_are_ok, |is_component| {
let resp = Test::using_fixture("expects-hello.wasm")
.adapt_component(is_component)
.async_backend(
"ReturnsHello",
"/",
None,
move |req: hyper::Request<hyper::Body>| {
Box::new(async move {
let Some(encoding) = req.headers().get("accept-encoding") else {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::empty())
.unwrap();
};
if !encoding.to_str().unwrap().to_lowercase().contains("gzip") {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::empty())
.unwrap();
}
// Produced by: echo -n "hello world" | gzip >hello_world.gzip
const GZ_BODY: &[u8] = include_bytes!("hello_world.gzip");
// We'll "trickle back" our response.
let (mut write, read) = hyper::Body::channel();
// Assume a Tokio runtime for writing the response...
tokio::spawn(async move {
for &byte in GZ_BODY.iter() {
let Ok(_) =
write.send_data(body::Bytes::copy_from_slice(&[byte])).await
else {
return;
};
tokio::task::yield_now().await;
}
let _ = write.send_trailers(HeaderMap::default());
});
Response::builder()
.status(StatusCode::OK)
.header("content-encoding", "gzip")
.body(read)
.unwrap()
})
},
)
.await
.against_empty()
.await?;
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});
Binary file added cli/tests/integration/hello_world.gzip
Binary file not shown.
176 changes: 176 additions & 0 deletions lib/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,35 @@ impl Body {
self.extend(body)
}

/// Analogue to the AsyncRead traits, but without running through std::io::Result.
///
/// Attempts to read at least one byte into the buffer.
/// Waits until at least one byte is available. If an error is encountered,
/// returns the error; if the stream ends, returns Ok(0).
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, error::Error> {
// poll_data may return empty chunks, which we need to filter out.
// Loop until we get a nonempty chunk, an error, or an end-of-stream.
loop {
match self.data().await {
None => return Ok(0),
Some(Err(e)) => return Err(e),

// Empty chunk; avoid signaling end-of-stream, wait for more data:
Some(Ok(bytes)) if bytes.is_empty() => continue,

Some(Ok(src)) => {
let copy_len = std::cmp::min(buf.len(), src.len());
buf[..copy_len].copy_from_slice(&src[..copy_len]);
let remainder = &src[copy_len..];
if !remainder.is_empty() {
self.chunks.push_front(remainder.into());
}
return Ok(copy_len);
}
}
}
}

/// Read the entire body into a byte vector.
pub async fn read_into_vec(self) -> Result<Vec<u8>, error::Error> {
let mut body = Box::new(self);
Expand Down Expand Up @@ -348,3 +377,150 @@ impl HttpBody for Body {
SizeHint::with_exact(size)
}
}

#[cfg(test)]
mod tests {
use std::io::Read;

use bytes::{Bytes, BytesMut};
use flate2::{Compression, GzBuilder};
use http::HeaderMap;
use proptest::prelude::*;

use crate::body::Chunk;

/// Proptest strategy: get a set of Bytes.
fn some_bytes() -> impl Strategy<Value = Bytes> {
proptest::collection::vec(any::<u8>(), 0..16).prop_map(|v| v.into())
}

// Gzip some bytes, with "best" compression and no header fields.
fn compress_body(body: &[u8]) -> Bytes {
let mut encoder =
GzBuilder::new().buf_read(std::io::Cursor::new(body), Compression::best());
let mut compressed = Vec::new();
encoder
.read_to_end(&mut compressed)
.expect("failed to compress gzip body");
compressed.into()
}

// Gradually send the provided body to the provided sender, using the provided chunk lengths.
async fn trickle_body(mut sender: hyper::body::Sender, body: Bytes, chunk_lengths: Vec<usize>) {
// Put "the whole body" at the back of the chunk-lengths, so we'll send the whole body by
// the last iteration.
let all_chunks = chunk_lengths.into_iter().chain(std::iter::once(body.len()));
let mut remaining: &[u8] = &body;
for chunk_length in all_chunks {
if remaining.len() == 0 {
break;
}
let len = std::cmp::min(remaining.len(), chunk_length);
let to_send = &remaining[..len];
remaining = &remaining[len..];
let Ok(_) = sender.send_data(Bytes::copy_from_slice(to_send)).await else {
return;
};
}
let _ = sender.send_trailers(HeaderMap::default()).await;
}

/// Check that the Body produces the given data, when read using the AsyncRead-like API.
async fn check_body_read(
mut body: crate::body::Body,
want: Bytes,
) -> Result<(), TestCaseError> {
let mut got = BytesMut::new();
let buf = &mut [0u8; 16];
while let Ok(n) = body.read(buf).await {
if n == 0 {
break;
}
got.extend_from_slice(&buf[..n]);
}
prop_assert_eq!(got.len(), want.len());
for (i, (got, want)) in got.into_iter().zip(want.into_iter()).enumerate() {
prop_assert_eq!(got, want, "@{}: {} != {}", i, got, want);
}

Ok(())
}

/// Test that a given body can round-trip, even if the body is split into chunks.
async fn test_chunked_body(
bytes: Bytes,
chunk_lengths: Vec<usize>,
) -> Result<(), TestCaseError> {
let mut js = tokio::task::JoinSet::default();
let (sender, body) = hyper::Body::channel();
js.spawn({
let bytes = bytes.clone();
async move {
trickle_body(sender, bytes, chunk_lengths).await;
Ok(())
}
});
js.spawn(check_body_read(body.into(), bytes));
let results = js.join_all().await;
for result in results {
result?;
}
Ok(())
}

/// Test that a given body can round-trip the Gzip decoder, even if the compressed body
/// is split into chunks.
async fn test_gzip_body(
ungz_bytes: Bytes,
chunk_lengths: Vec<usize>,
) -> Result<(), TestCaseError> {
let gz_bytes = compress_body(&ungz_bytes);

let mut js = tokio::task::JoinSet::default();
let (sender, gz_body) = hyper::Body::channel();
js.spawn({
let gz_bytes = gz_bytes.clone();
async move {
trickle_body(sender, gz_bytes, chunk_lengths).await;
Ok(())
}
});
let ungz_body: crate::body::Body = Chunk::compressed_body(gz_body).into();
js.spawn(check_body_read(ungz_body, ungz_bytes));
let results = js.join_all().await;
for result in results {
result?;
}
Ok(())
}

proptest! {
#[test]
fn gzip_chunks_reproduce_body(
(body, chunk_lengths) in some_bytes().prop_flat_map(|bytes| {
let len = bytes.len();
let chunk_length_strategy= proptest::collection::vec(0..=len, 0..=(len/8));
(Just(bytes), chunk_length_strategy)
}),
) {
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
return rt.block_on(test_gzip_body(body, chunk_lengths));
}

}

proptest! {
#[test]
fn chunks_reproduce_body(
(body, chunk_lengths) in some_bytes().prop_flat_map(|bytes| {
let len = bytes.len();
let chunk_length_strategy= proptest::collection::vec(0..=len, 0..=(len/8));
(Just(bytes), chunk_length_strategy)
}),
) {
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
return rt.block_on(test_chunked_body(body, chunk_lengths));
}

}
}
22 changes: 5 additions & 17 deletions lib/src/component/http_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use {
headers,
},
crate::{body::Body, error::Error, linking::ComponentCtx},
::http_body::Body as HttpBody,
http::header::{HeaderName, HeaderValue},
};

Expand Down Expand Up @@ -81,22 +80,11 @@ impl http_body::Host for ComponentCtx {
// only normal bodies (not streaming bodies) can be read from
let body = self.session.body_mut(h.into())?;

if let Some(chunk) = body.data().await {
// pass up any error encountered when reading a chunk
let mut chunk = chunk?;
// split the chunk, saving any bytes that don't fit into the destination buffer
let extra_bytes = chunk.split_off(std::cmp::min(chunk_size as usize, chunk.len()));
// `chunk.len()` is now the smaller of (1) the destination buffer and (2) the available data.
let chunk = chunk.to_vec();
// if there are leftover bytes, put them back at the front of the body
if !extra_bytes.is_empty() {
body.push_front(extra_bytes);
}

Ok(chunk)
} else {
Ok(Vec::new())
}
let mut buffer = Vec::new();
buffer.resize(chunk_size as usize, 0u8);
let len = body.read(&mut buffer).await?;
buffer.truncate(len);
Ok(buffer)
}

async fn close(&mut self, h: http_types::BodyHandle) -> Result<(), types::Error> {
Expand Down
26 changes: 8 additions & 18 deletions lib/src/wiggle_abi/body_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use {
},
},
},
http_body::Body as HttpBody,
std::convert::TryInto,
wiggle::{GuestMemory, GuestPtr},
};
Expand Down Expand Up @@ -62,23 +61,14 @@ impl FastlyHttpBody for Session {
// only normal bodies (not streaming bodies) can be read from
let body = self.body_mut(body_handle)?;

if let Some(chunk) = body.data().await {
// pass up any error encountered when reading a chunk
let mut chunk = chunk?;
// split the chunk, saving any bytes that don't fit into the destination buffer
let copy_len = std::cmp::min(buf_len as usize, chunk.len());
let extra_bytes = chunk.split_off(copy_len);
// `chunk.len()` is now the smaller of (1) the destination buffer and (2) the available data.
memory.copy_from_slice(&chunk, buf.as_array(u32::try_from(copy_len).unwrap()))?;
// if there are leftover bytes, put them back at the front of the body
if !extra_bytes.is_empty() {
body.push_front(extra_bytes);
}

Ok(chunk.len() as u32)
} else {
Ok(0)
}
let array = buf.as_array(buf_len);
let slice = memory.as_slice_mut(array)?.ok_or(Error::SharedMemory)?;
let n = body
.read(slice)
.await?
.try_into()
.expect("guest buffer size must be less than u32");
Ok(n)
}

async fn write(
Expand Down
16 changes: 16 additions & 0 deletions test-fixtures/src/bin/expects-hello.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use fastly::Request;
use http::{HeaderName, StatusCode, Version};

fn main() {
let mut resp = Request::get("https://fastly.com/")
// .with_version(Version::HTTP_2)
.with_header(HeaderName::from_static("accept-encoding"), "gzip")
.with_auto_decompress_gzip(true)
.send("ReturnsHello")
.expect("can send request");
assert_eq!(resp.get_status(), StatusCode::OK);
let got = resp.take_body_str();
eprintln!("got: {}", got);

assert_eq!(&got, "hello world", "got: {}", got);
}
Loading