Skip to content

Commit

Permalink
wip: upgrade proxy/http
Browse files Browse the repository at this point in the history
  • Loading branch information
cratelyn committed Jan 13, 2025
1 parent 4353530 commit fa62779
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,7 @@ dependencies = [
"httparse",
"hyper",
"hyper-balance",
"hyper-util",
"linkerd-detect",
"linkerd-duplex",
"linkerd-error",
Expand Down
6 changes: 3 additions & 3 deletions linkerd/http/box/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl BoxBody {
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
B::Error: std::error::Error + Send + Sync + 'static,
{
Self {
inner: Box::pin(Inner(inner)),
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<B> Body for Inner<B>
where
B: Body,
B::Data: Send + 'static,
B::Error: Into<Error>,
B::Error: std::error::Error + Send + Sync + 'static,
{
type Data = Data;
type Error = Error;
Expand Down Expand Up @@ -140,7 +140,7 @@ impl<B> Inner<B>
where
B: Body,
B::Data: Send + 'static,
B::Error: Into<Error>,
B::Error: std::error::Error + Send + Sync + 'static,
{
fn map_frame(frame: Result<Frame<B::Data>, B::Error>) -> Result<Frame<Data>, Error> {
match frame {
Expand Down
1 change: 1 addition & 0 deletions linkerd/proxy/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hyper = { workspace = true, features = [
"http2",
"server",
] }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "service"] }
hyper-balance = { path = "../../../hyper-balance" }
parking_lot = "0.12"
pin-project = "1"
Expand Down
10 changes: 5 additions & 5 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ where
T: Clone + Send + Sync + 'static,
X: ExtractParam<Params, T>,
C: MakeConnection<(crate::Version, T)> + Clone + Unpin + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Connection: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Metadata: Send,
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static,
B: crate::Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
Expand Down Expand Up @@ -120,12 +120,12 @@ impl<C, T, B> Service<http::Request<B>> for Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Connection: hyper::rt::Read + hyper::rt::Write + Unpin + Send,
C::Future: Unpin + Send + 'static,
C::Error: Into<Error>,
B: crate::Body + Send + 'static,
B: crate::Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
B::Error: std::error::Error + Send + Sync + 'static,
{
type Response = http::Response<BoxBody>;
type Error = Error;
Expand Down
14 changes: 6 additions & 8 deletions linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub struct PoolSettings {
pub struct Client<C, T, B> {
connect: C,
target: T,
absolute_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
origin_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
absolute_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
origin_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
pool: PoolSettings,
}

Expand Down Expand Up @@ -68,9 +68,9 @@ impl<C, T, B> Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Connection: hyper::rt::Read + hyper::rt::Write + Unpin + Send,
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static,
B: crate::Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
Expand All @@ -94,10 +94,9 @@ where
// ish, so we just build a one-off client for the connection.
// There's no real reason to hold the client for re-use.
debug!(use_absolute_form, is_missing_host, "Using one-off client");
hyper::Client::builder()
hyper_util::client::legacy::Client::builder(TracingExecutor)
.pool_max_idle_per_host(0)
.set_host(use_absolute_form)
.executor(TracingExecutor)
.build(HyperConnect::new(
self.connect.clone(),
self.target.clone(),
Expand All @@ -120,11 +119,10 @@ where
if client.is_none() {
debug!(use_absolute_form, "Caching new client");
*client = Some(
hyper::Client::builder()
hyper_util::client::legacy::Client::builder(TracingExecutor)
.pool_max_idle_per_host(self.pool.max_idle)
.pool_idle_timeout(self.pool.idle_timeout)
.set_host(use_absolute_form)
.executor(TracingExecutor)
.build(HyperConnect::new(
self.connect.clone(),
self.target.clone(),
Expand Down
12 changes: 8 additions & 4 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{Body, TracingExecutor};
use futures::prelude::*;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::{MakeConnection, Service};
use std::{
marker::PhantomData,
Expand Down Expand Up @@ -52,10 +53,10 @@ type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Sen
impl<C, B, T> Service<T> for Connect<C, B>
where
C: MakeConnection<(crate::Version, T)>,
C::Connection: Send + Unpin + 'static,
C::Connection: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
C::Metadata: Send,
C::Future: Send + 'static,
B: Body + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
Expand Down Expand Up @@ -147,7 +148,7 @@ where
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
type Response = http::Response<hyper::Body>;
type Response = http::Response<BoxBody>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;

Expand All @@ -171,6 +172,9 @@ where
*req.version_mut() = http::Version::HTTP_11;
}

self.tx.send_request(req).boxed()
self.tx
.send_request(req)
.map_ok(|rsp| rsp.map(BoxBody::new))
.boxed()
}
}
26 changes: 9 additions & 17 deletions linkerd/proxy/http/src/orig_proto.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{h1, h2, Body};
use futures::prelude::*;
use http::header::{HeaderValue, TRANSFER_ENCODING};
use http_body::Frame;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, MakeConnection, Service};
Expand Down Expand Up @@ -54,11 +55,11 @@ impl<C, T, B> Service<http::Request<B>> for Upgrade<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Connection: hyper::rt::Read + hyper::rt::Write + Unpin + Send,
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static,
B: crate::Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
B::Error: std::error::Error + Send + Sync + 'static,
{
type Response = http::Response<BoxBody>;
type Error = Error;
Expand Down Expand Up @@ -125,7 +126,8 @@ where
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
rsp.map(|inner| UpgradeResponseBody { inner })
.map(BoxBody::new)
}),
)
}
Expand Down Expand Up @@ -211,23 +213,13 @@ where
self.inner.is_end_stream()
}

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.project()
.inner
.poll_data(cx)
.map_err(downgrade_h2_error)
}

fn poll_trailers(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project()
.inner
.poll_trailers(cx)
.poll_frame(cx)
.map_err(downgrade_h2_error)
}

Expand Down
38 changes: 27 additions & 11 deletions linkerd/proxy/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use linkerd_io::{self as io, PeerAddr};
use linkerd_stack::{layer, ExtractParam, NewService};
use std::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
Expand All @@ -32,12 +33,13 @@ pub struct NewServeHttp<X, N> {

/// Serves HTTP connections with an inner service.
#[derive(Clone, Debug)]
pub struct ServeHttp<N> {
pub struct ServeHttp<N, ReqB = BoxBody> {
version: Version,
http1: hyper::server::conn::http1::Builder,
http2: hyper::server::conn::http2::Builder<TracingExecutor>,
inner: N,
drain: drain::Watch,
marker: PhantomData<ReqB>,
}

// === impl NewServeHttp ===
Expand Down Expand Up @@ -118,21 +120,33 @@ where
drain,
http1: hyper::server::conn::http1::Builder::new(),
http2,
marker: PhantomData,
}
}
}

// === impl ServeHttp ===

impl<I, N, S> Service<I> for ServeHttp<N>
impl<I, N, S, ReqB> Service<I> for ServeHttp<N, ReqB>
where
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static,
I: hyper::rt::Read + hyper::rt::Write + PeerAddr + Send + Unpin + 'static,
N: NewService<ClientHandle, Service = S> + Send + 'static,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>, Error = Error>
S: Service<
http::Request<linkerd_http_upgrade::glue::UpgradeBody<hyper::body::Incoming>>,
Response = http::Response<BoxBody>,
Error = Error,
> + Service<
http::Request<hyper::body::Incoming>,
Response = http::Response<BoxBody>,
Error = Error,
> + Clone
+ Unpin
+ Send
+ 'static,
S::Future: Send + 'static,
<S as Service<
http::Request<linkerd_http_upgrade::glue::UpgradeBody<hyper::body::Incoming>>,
>>::Future: Send + 'static,
<S as Service<http::Request<hyper::body::Incoming>>>::Future: Send + 'static,
{
type Response = ();
type Error = Error;
Expand All @@ -157,16 +171,17 @@ where

Box::pin(
async move {
use hyper_util::service::TowerToHyperService;
let (svc, closed) = res?;
debug!(?version, "Handling as HTTP");

match version {
Version::Http1 => {
// Enable support for HTTP upgrades (CONNECT and websockets).
let svc = linkerd_http_upgrade::upgrade::Service::new(
BoxRequest::new(svc),
drain.clone(),
);
let mut conn = http1.serve_connection(io, svc).with_upgrades();
let svc = linkerd_http_upgrade::upgrade::Service::new(svc, drain.clone());
let mut conn = http1
.serve_connection(io, TowerToHyperService::new(svc))
.with_upgrades();

tokio::select! {
res = &mut conn => {
Expand All @@ -187,7 +202,8 @@ where
}

Version::H2 => {
let mut conn = http2.serve_connection(io, BoxRequest::new(svc));
let svc = TowerToHyperService::new(svc);
let mut conn = http2.serve_connection(io, svc);

tokio::select! {
res = &mut conn => {
Expand Down

0 comments on commit fa62779

Please sign in to comment.