Skip to content

Commit

Permalink
refactor(http/retry): prepare PeekTrailersBody<B> for hyper upgrade
Browse files Browse the repository at this point in the history
this is a refactor commit, outlined from #3504.

this commit is particularly interested in preparing the
`PeekTrailersBody<B>` middleware for our upgrade to hyper/http-body 1.0.
more specifically: this commit clears up the boundary concerning when it
will or will not become inert and delegate to its inner body `B`.

currently, a `PeekTrailersBody<B>` is not fully consistent about the
conditions in which it will peek the trailers of a response body: the
inner body is allowed to yield _either_ (a) **zero** DATA frames, in which
case the body will be `.await`'ed and polled until the trailers are
obtained, or (b) **one** DATA frame, so long as the inner body
immediately yields a trailer.

see linkerd/linkerd2#8733.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 8, 2025
1 parent b07b0d8 commit 6928536
Showing 1 changed file with 89 additions and 76 deletions.
165 changes: 89 additions & 76 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,31 @@ use std::{
task::{Context, Poll},
};

/// An HTTP body that allows inspecting the body's trailers, if a `TRAILERS`
/// frame was the first frame after the initial headers frame.
/// An HTTP body that allows inspecting the body's trailers.
///
/// If the first frame of the body stream was *not* a `TRAILERS` frame, this
/// behaves identically to a normal body.
pub struct PeekTrailersBody<B: Body = BoxBody> {
inner: B,

/// The first DATA frame received from the inner body, or an error that
/// occurred while polling for data.
///
/// If this is `None`, then the body has completed without any DATA frames.
first_data: Option<Result<B::Data, B::Error>>,

/// The inner body's trailers, if it was terminated by a `TRAILERS` frame
/// after 0-1 DATA frames, or an error if polling for trailers failed.
/// The body's trailers may be peeked with [`PeekTrailersBody::peek_trailers()`].
///
/// Trailers may only be peeked if the inner body immediately yields a TRAILERS frame. If the first
/// frame of the body stream was *not* a `TRAILERS` frame, this behaves identically to a normal
/// body.
pub enum PeekTrailersBody<B: Body = BoxBody> {
/// The trailers are not available to be inspected.
Passthru {
/// The inner body.
inner: B,
/// The first DATA frame received from the inner body, or an error that
/// occurred while polling for data.
///
/// If this is `None`, then the body has completed without any DATA frames.
first_data: Option<Result<B::Data, B::Error>>,
},
/// The trailers have been peeked.
///
/// Yes, this is a bit of a complex type, so let's break it down:
/// - the outer `Option` indicates whether any trailers were received by
/// `WithTrailers`; if it's `None`, then we don't *know* if the response
/// had trailers, as it is not yet complete.
/// - the inner `Result` and `Option` are the `Result` and `Option` returned
/// by `HttpBody::trailers` on the inner body. If this is `Ok(None)`, then
/// the body has terminated without trailers --- it is *known* to not have
/// trailers.
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
/// This variant applies if the inner body's first frame was a `TRAILERS` frame.
Peek {
/// The inner body's trailers.
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
},
}

pub type WithPeekTrailersBody<B> = Either<
Expand All @@ -50,10 +49,16 @@ pub struct ResponseWithPeekTrailers<S>(pub(crate) S);
// === impl WithTrailers ===

impl<B: Body> PeekTrailersBody<B> {
/// Returns a reference to the trailers, if applicable.
///
/// See [`PeekTrailersBody<B>`] for more information on when this returns `None`.
pub fn peek_trailers(&self) -> Option<&http::HeaderMap> {
self.trailers
.as_ref()
.and_then(|trls| trls.as_ref().ok()?.as_ref())
match self {
Self::Peek {
trailers: Some(Ok(Some(ref t))),
} => Some(t),
Self::Passthru { .. } | Self::Peek { .. } => None,
}
}

pub fn map_response(rsp: http::Response<B>) -> WithPeekTrailersBody<B>
Expand Down Expand Up @@ -87,44 +92,32 @@ impl<B: Body> PeekTrailersBody<B> {
B::Data: Send + Unpin,
B::Error: Send,
{
let (parts, body) = rsp.into_parts();
let mut body = Self {
inner: body,
first_data: None,
trailers: None,
};
let (parts, mut body) = rsp.into_parts();

tracing::debug!("Buffering first data frame");
if let Some(data) = body.inner.data().await {
tracing::debug!("Buffering first body frame");
if let first_data @ Some(_) = body.data().await {
// The body has data; stop waiting for trailers.
body.first_data = Some(data);

// Peek to see if there's immediately a trailers frame, and grab
// it if so. Otherwise, bail.
// XXX(eliza): the documentation for the `http::Body` trait says
// that `poll_trailers` should only be called after `poll_data`
// returns `None`...but, in practice, I'm fairly sure that this just
// means that it *will not return `Ready`* until there are no data
// frames left, which is fine for us here, because we `now_or_never`
// it.
body.trailers = body.inner.trailers().now_or_never();
} else {
// Okay, `poll_data` has returned `None`, so there are no data
// frames left. Let's see if there's trailers...
body.trailers = Some(body.inner.trailers().await);
}
if body.trailers.is_some() {
tracing::debug!("Buffered trailers frame");
let body = Self::Passthru {
inner: body,
first_data,
};
return http::Response::from_parts(parts, body);
}

// We have confirmed that there are no data frames. Now, await the trailers.
let trailers = body.trailers().await;
tracing::debug!("Buffered trailers frame");
let body = Self::Peek {
trailers: Some(trailers),
};
http::Response::from_parts(parts, body)
}

/// Returns a response with an inert [`PeekTrailersBody<B>`].
fn no_trailers(rsp: http::Response<B>) -> http::Response<Self> {
rsp.map(|inner| Self {
rsp.map(|inner| Self::Passthru {
inner,
first_data: None,
trailers: None,
})
}
}
Expand All @@ -142,47 +135,67 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.get_mut();
if let Some(first_data) = this.first_data.take() {
return Poll::Ready(Some(first_data));
match self.get_mut() {
Self::Passthru {
first_data,
ref mut inner,
} => {
// Return the first chunk that was buffered originally.
if let data @ Some(_) = first_data.take() {
return Poll::Ready(data);
}
// ...and then, poll the inner body.
Pin::new(inner).poll_data(cx)
}
// If we have peeked the trailers, we've already polled an empty body.
Self::Peek { .. } => Poll::Ready(None),
}

Pin::new(&mut this.inner).poll_data(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.get_mut();
if let Some(trailers) = this.trailers.take() {
return Poll::Ready(trailers);
match self.get_mut() {
Self::Passthru { ref mut inner, .. } => Pin::new(inner).poll_trailers(cx),
Self::Peek { trailers } => {
let trailers = trailers
.take()
.expect("poll_trailers should not be called more than once");
Poll::Ready(trailers)
}
}

Pin::new(&mut this.inner).poll_trailers(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
self.first_data.is_none() && self.trailers.is_none() && self.inner.is_end_stream()
match self {
Self::Passthru { inner, first_data } => first_data.is_none() && inner.is_end_stream(),
Self::Peek { trailers: Some(_) } => false,
Self::Peek { trailers: None } => true,
}
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
use bytes::Buf;

let mut hint = self.inner.size_hint();
// If we're holding onto a chunk of data, add its length to the inner
// `Body`'s size hint.
if let Some(Ok(chunk)) = self.first_data.as_ref() {
let buffered = chunk.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + buffered);
match self {
Self::Passthru { inner, first_data } => {
let mut hint = inner.size_hint();
// If we're holding onto a chunk of data, add its length to the inner
// `Body`'s size hint.
if let Some(Ok(chunk)) = first_data.as_ref() {
let buffered = chunk.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + buffered);
}
hint.set_lower(hint.lower() + buffered);
}
hint
}
hint.set_lower(hint.lower() + buffered);
Self::Peek { .. } => http_body::SizeHint::default(),
}

hint
}
}

Expand Down

0 comments on commit 6928536

Please sign in to comment.