diff --git a/Cargo.lock b/Cargo.lock index d915d515b3..981d09853d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1894,6 +1894,7 @@ dependencies = [ "http 1.2.0", "http-body", "hyper", + "hyper-util", "linkerd-duplex", "linkerd-error", "linkerd-http-box", diff --git a/linkerd/http/upgrade/Cargo.toml b/linkerd/http/upgrade/Cargo.toml index 39df18cc9f..dee6ac64f9 100644 --- a/linkerd/http/upgrade/Cargo.toml +++ b/linkerd/http/upgrade/Cargo.toml @@ -16,6 +16,10 @@ futures = { version = "0.3", default-features = false } http = { workspace = true } http-body = { workspace = true } hyper = { workspace = true, default-features = false, features = ["client"] } +hyper-util = { workspace = true, default-features = false, features = [ + "client", + "client-legacy", +] } pin-project = "1" tokio = { version = "1", default-features = false } tower = { version = "0.4", default-features = false } diff --git a/linkerd/http/upgrade/src/glue.rs b/linkerd/http/upgrade/src/glue.rs index 9557fc9d86..acf57f3125 100644 --- a/linkerd/http/upgrade/src/glue.rs +++ b/linkerd/http/upgrade/src/glue.rs @@ -1,7 +1,7 @@ use crate::upgrade::Http11Upgrade; use futures::{ready, TryFuture}; -use http_body::Body; -use hyper::client::connect as hyper_connect; +use http_body::{Body, Frame}; +use hyper_util::client::legacy::connect as hyper_connect; use linkerd_error::{Error, Result}; use linkerd_http_box::BoxBody; use linkerd_io::{self as io, AsyncRead, AsyncWrite}; @@ -63,38 +63,21 @@ where self.body.is_end_stream() } - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - // Poll the next chunk from the body. + ) -> Poll, Self::Error>>> { + // Poll the next frame from the body. let this = self.project(); let body = this.body; - let data = ready!(body.poll_data(cx)); + let frame = ready!(body.poll_frame(cx)); // Log errors. - if let Some(Err(e)) = &data { + if let Some(Err(e)) = &frame { debug!("http body error: {}", e); } - Poll::Ready(data) - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - // Poll the trailers from the body. - let this = self.project(); - let body = this.body; - let trailers = ready!(body.poll_trailers(cx)); - - // Log errors. - if let Err(e) = &trailers { - debug!("http trailers error: {}", e); - } - - Poll::Ready(trailers) + Poll::Ready(frame) } #[inline] diff --git a/linkerd/http/upgrade/src/upgrade.rs b/linkerd/http/upgrade/src/upgrade.rs index 3d6975472f..73ea7c1d03 100644 --- a/linkerd/http/upgrade/src/upgrade.rs +++ b/linkerd/http/upgrade/src/upgrade.rs @@ -22,8 +22,12 @@ use try_lock::TryLock; /// inserted into the `Request::extensions()`. If the HTTP1 client service /// also detects an upgrade, the two `OnUpgrade` futures will be joined /// together with the glue in this type. +// // Note: this relies on their only having been 2 Inner clones, so don't // implement `Clone` for this type. +// XXX(kate): to satisfy new trait bounds when upgrading to hyper 1.x, this type must now be +// Clone'able. +#[derive(Clone)] pub struct Http11Upgrade { half: Half, inner: Arc, @@ -50,7 +54,9 @@ struct Inner { upgrade_drain_signal: Option, } -#[derive(Debug)] +// XXX(kate): to satisfy new trait bounds when upgrading to hyper 1.x, this type must now be +// Clone'able. +#[derive(Clone, Debug)] enum Half { Server, Client, @@ -139,6 +145,9 @@ impl Drop for Inner { let both_upgrades = async move { let (server_conn, client_conn) = tokio::try_join!(server_upgrade, client_upgrade)?; trace!("HTTP upgrade successful"); + use hyper_util::rt::TokioIo; + let client_conn = TokioIo::new(client_conn); + let server_conn = TokioIo::new(server_conn); if let Err(e) = Duplex::new(client_conn, server_conn).await { info!("tcp duplex error: {}", e) }