Skip to content

Commit

Permalink
feat(http/retry): add Frame<T> compatibility facilities
Browse files Browse the repository at this point in the history
this commit introduces a `compat` submodule to `linkerd-http-retry`.

this helps us frontrun the task of replacing all of the finicky control
flow in `PeekTrailersBody<B>` using the antiquated `data()` and
`trailers()` future combinators. instead, we can perform our peeking
in terms of an approximation of `http_body_util::BodyExt::frame()`.

to accomplish this, this commit vendors a copy of the `Frame<T>` type.
we can use this to preemptively model our peek body in terms of this
type, and move to the "real" version of it when we're upgrading in
pr #3504.

additionally, this commit includes a type called
`ForwardCompatibleBody<B>`, and a variant of the `Frame<'a, T>`
combinator. these are a bit boilerplate-y, admittedly, but the pleasant
part of this is that we have, in effect, migrated the trickiest body
middleware in advance of #3504. once we upgrade to http-body 1.0, all of
these types can be removed.

https://docs.rs/http-body-util/latest/http_body_util/trait.BodyExt.html#method.frame
https://docs.rs/http-body-util/0.1.2/src/http_body_util/combinators/frame.rs.html#10

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 23, 2025
1 parent 907f895 commit 8551d5a
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 0 deletions.
115 changes: 115 additions & 0 deletions linkerd/http/retry/src/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Compatibility utilities for upgrading to http-body 1.0.
use http_body::Body;

pub(crate) use self::frame::Frame;

mod frame;

#[derive(Debug)]
pub(crate) struct ForwardCompatibleBody<B> {
inner: B,
data_finished: bool,
trailers_finished: bool,
}

// === impl ForwardCompatibleBody ===

impl<B: Body> ForwardCompatibleBody<B> {
pub(crate) fn new(body: B) -> Self {
if body.is_end_stream() {
Self {
inner: body,
data_finished: true,
trailers_finished: true,
}
} else {
Self {
inner: body,
data_finished: false,
trailers_finished: false,
}
}
}

pub(crate) fn into_inner(self) -> B {
self.inner
}

/// Returns a future that resolves to the next frame.
pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> {
combinators::Frame(self)
}
}

/// Future that resolves to the next frame from a `Body`.
///
/// NB: This is a vendored stand-in for [`Frame<'a, T>`][frame], and and can be replaced once
/// we upgrade from http-body 0.4 to 1.0. This file was vendored, and subsequently adapted to this
/// project, at commit 86fdf00.
///
/// See linkerd/linkerd2#8733 for more information.
///
/// [frame]: https://docs.rs/http-body-util/0.1.2/http_body_util/combinators/struct.Frame.html
mod combinators {
use core::future::Future;
use core::pin::Pin;
use core::task;
use http_body::Body;
use std::ops::Not;
use std::task::ready;

use super::ForwardCompatibleBody;

#[must_use = "futures don't do anything unless polled"]
#[derive(Debug)]
/// Future that resolves to the next frame from a [`Body`].
pub struct Frame<'a, T>(pub(super) &'a mut super::ForwardCompatibleBody<T>);

impl<T: Body + Unpin> Future for Frame<'_, T> {
type Output = Option<Result<super::Frame<T::Data>, T::Error>>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let Self(ForwardCompatibleBody {
inner,
data_finished,
trailers_finished,
}) = self.get_mut();
let mut pinned = Pin::new(inner);

// We have already yielded the trailers, the body is done.
if *trailers_finished {
return task::Poll::Ready(None);
}

// We are still yielding data frames.
if data_finished.not() {
match ready!(pinned.as_mut().poll_data(ctx)) {
Some(Ok(data)) => {
// We yielded a frame.
return task::Poll::Ready(Some(Ok(super::Frame::data(data))));
}
Some(Err(error)) => {
// If we encountered an error, we are finished.
*data_finished = true;
*trailers_finished = true;
return task::Poll::Ready(Some(Err(error)));
}
None => {
// We are done yielding data frames. Mark the corresponding flag, and fall
// through to poll the trailers...
*data_finished = true;
}
};
}

// We have yielded all of the data frames but have not yielded the trailers.
let trailers = ready!(pinned.poll_trailers(ctx));
*trailers_finished = true;
let trailers = trailers
.transpose()
.map(|res| res.map(super::Frame::trailers));
task::Poll::Ready(trailers)
}
}
}
131 changes: 131 additions & 0 deletions linkerd/http/retry/src/compat/frame.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#![allow(unused, reason = "this code is vendored from `http-body v1.0.1")]

//! A frame of any kind related to an HTTP stream (body).
//!
//! NB: This is a vendored stand-in for [`Frame<T>`][frame], and and can be replaced once
//! we upgrade from http-body 0.4 to 1.0. This file was vendored at commit 86fdf00.
//!
//! See linkerd/linkerd2#8733 for more information.
//!
//! [frame]: https://docs.rs/http-body/1.0.1/http_body/struct.Frame.html>
use http::HeaderMap;

/// A frame of any kind related to an HTTP stream (body).
#[derive(Debug)]
pub struct Frame<T> {
kind: Kind<T>,
}

#[derive(Debug)]
enum Kind<T> {
// The first two variants are "inlined" since they are undoubtedly
// the most common. This saves us from having to allocate a
// boxed trait object for them.
Data(T),
Trailers(HeaderMap),
//Unknown(Box<dyn Frameish>),
}

impl<T> Frame<T> {
/// Create a DATA frame with the provided `Buf`.
pub fn data(buf: T) -> Self {
Self {
kind: Kind::Data(buf),
}
}

/// Create a trailers frame.
pub fn trailers(map: HeaderMap) -> Self {
Self {
kind: Kind::Trailers(map),
}
}

/// Maps this frame's data to a different type.
pub fn map_data<F, D>(self, f: F) -> Frame<D>
where
F: FnOnce(T) -> D,
{
match self.kind {
Kind::Data(data) => Frame {
kind: Kind::Data(f(data)),
},
Kind::Trailers(trailers) => Frame {
kind: Kind::Trailers(trailers),
},
}
}

/// Returns whether this is a DATA frame.
pub fn is_data(&self) -> bool {
matches!(self.kind, Kind::Data(..))
}

/// Consumes self into the buf of the DATA frame.
///
/// Returns an [`Err`] containing the original [`Frame`] when frame is not a DATA frame.
/// `Frame::is_data` can also be used to determine if the frame is a DATA frame.
pub fn into_data(self) -> Result<T, Self> {
match self.kind {
Kind::Data(data) => Ok(data),
_ => Err(self),
}
}

/// If this is a DATA frame, returns a reference to it.
///
/// Returns `None` if not a DATA frame.
pub fn data_ref(&self) -> Option<&T> {
match self.kind {
Kind::Data(ref data) => Some(data),
_ => None,
}
}

/// If this is a DATA frame, returns a mutable reference to it.
///
/// Returns `None` if not a DATA frame.
pub fn data_mut(&mut self) -> Option<&mut T> {
match self.kind {
Kind::Data(ref mut data) => Some(data),
_ => None,
}
}

/// Returns whether this is a trailers frame.
pub fn is_trailers(&self) -> bool {
matches!(self.kind, Kind::Trailers(..))
}

/// Consumes self into the buf of the trailers frame.
///
/// Returns an [`Err`] containing the original [`Frame`] when frame is not a trailers frame.
/// `Frame::is_trailers` can also be used to determine if the frame is a trailers frame.
pub fn into_trailers(self) -> Result<HeaderMap, Self> {
match self.kind {
Kind::Trailers(trailers) => Ok(trailers),
_ => Err(self),
}
}

/// If this is a trailers frame, returns a reference to it.
///
/// Returns `None` if not a trailers frame.
pub fn trailers_ref(&self) -> Option<&HeaderMap> {
match self.kind {
Kind::Trailers(ref trailers) => Some(trailers),
_ => None,
}
}

/// If this is a trailers frame, returns a mutable reference to it.
///
/// Returns `None` if not a trailers frame.
pub fn trailers_mut(&mut self) -> Option<&mut HeaderMap> {
match self.kind {
Kind::Trailers(ref mut trailers) => Some(trailers),
_ => None,
}
}
}
2 changes: 2 additions & 0 deletions linkerd/http/retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub mod peek_trailers;
pub mod replay;

mod compat;

pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody};
pub use tower::retry::budget::Budget;

Expand Down

0 comments on commit 8551d5a

Please sign in to comment.