Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bytes::Bytes as the HTTP request body in HttpClient. #2485

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Bump msrv to 1.75.0.
- Add "internal-logs" feature flag (enabled by default), and emit internal logs.
- Add `HttpClient::send_bytes` with `bytes::Bytes` request payload and deprecate old `HttpClient::send` function.

## 0.27.0

Expand Down
21 changes: 16 additions & 5 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,24 @@
/// users to bring their choice of HTTP client.
#[async_trait]
pub trait HttpClient: Debug + Send + Sync {
/// Send the specified HTTP request
/// Send the specified HTTP request with `Vec<u8>` payload
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError>;
#[deprecated(note = "Use `send_bytes` with `Bytes` payload instead.")]
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
self.send_bytes(request.map(Into::into)).await
}

Check warning on line 67 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L65-L67

Added lines #L65 - L67 were not covered by tests

/// Send the specified HTTP request with `Bytes` payload.
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand you want to create a pool of Bytes and reuse them across different send/send_bytes call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly, serialization of spans to export and send to Datadog is one of the largest memory allocation source of backend service I'm working on. I want to reuse buffers to serialize and sending over http between calls to exports.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mstyura - will this enable buffer reuse in opentelemetry-otlp as well? It doesn't seem to be part of this PR, so ensuring.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalitb I didn't have such plans initially, as I see proposed API is used by oltp and zipkin inside this repo, so they could leverage as well from buffer pooling. But as I don't use them in production I don't understand how heavily they actually allocate memory compared to DataDog exported I initially wanted to fix.
Do you want me to implement pooling for oltp and zipkin? If so I think it's preferable to do it as separate MR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to implement pooling for oltp and zipkin? If so I think it's preferable to do it as separate MR.

Yes, I think it would be good to have this optimization as separate PR. Thanks.

}

#[cfg(feature = "reqwest")]
Expand All @@ -72,7 +83,7 @@

#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {

Check warning on line 86 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L86

Added line #L86 was not covered by tests
otel_debug!(name: "ReqwestClient.Send");
let request = request.try_into()?;
let mut response = self.execute(request).await?.error_for_status()?;
Expand All @@ -89,7 +100,7 @@
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {

Check warning on line 103 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L103

Added line #L103 was not covered by tests
otel_debug!(name: "ReqwestBlockingClient.Send");
let request = request.try_into()?;
let mut response = self.execute(request)?.error_for_status()?;
Expand Down Expand Up @@ -159,7 +170,7 @@

#[async_trait]
impl HttpClient for HyperClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {

Check warning on line 173 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L173

Added line #L173 was not covered by tests
otel_debug!(name: "HyperClient.Send");
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, Body(Full::from(body)));
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.body(body.into())

Check warning on line 31 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L31

Added line #L31 was not covered by tests
lalitb marked this conversation as resolved.
Show resolved Hide resolved
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
Expand All @@ -37,7 +37,7 @@

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send(request).await?;
let response = client.send_bytes(request).await?;

Check warning on line 40 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L40

Added line #L40 was not covered by tests

if !response.status().is_success() {
let error = format!(
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.body(body.into())

Check warning on line 30 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L30

Added line #L30 was not covered by tests
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
Expand All @@ -36,7 +36,7 @@

otel_debug!(name: "HttpMetricsClient.CallingExport");
client
.send(request)
.send_bytes(request)

Check warning on line 39 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L39

Added line #L39 was not covered by tests
.await
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.body(body.into())

Check warning on line 33 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L33

Added line #L33 was not covered by tests
{
Ok(req) => req,
Err(e) => {
Expand All @@ -48,7 +48,7 @@
Box::pin(async move {
let request_uri = request.uri().to_string();
otel_debug!(name: "HttpTracesClient.CallingExport");
let response = client.send(request).await?;
let response = client.send_bytes(request).await?;

Check warning on line 51 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L51

Added line #L51 was not covered by tests

if !response.status().is_success() {
let error = format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@
{
let request = http::Request::get(endpoint)
.header("Content-Type", "application/json")
.body(Vec::new())
.body(Default::default())

Check warning on line 231 in opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs#L231

Added line #L231 was not covered by tests
.unwrap();

let resp = client
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-zipkin/src/exporter/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
.method(Method::POST)
.uri(self.collector_endpoint.clone())
.header(CONTENT_TYPE, "application/json")
.body(serde_json::to_vec(&spans).unwrap_or_default())
.body(serde_json::to_vec(&spans).unwrap_or_default().into())

Check warning on line 44 in opentelemetry-zipkin/src/exporter/uploader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-zipkin/src/exporter/uploader.rs#L44

Added line #L44 was not covered by tests
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
let _ = self.client.send_bytes(req).await?.error_for_status()?;

Check warning on line 46 in opentelemetry-zipkin/src/exporter/uploader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-zipkin/src/exporter/uploader.rs#L46

Added line #L46 was not covered by tests
Ok(())
}
}
4 changes: 2 additions & 2 deletions opentelemetry-zipkin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@
//!
//! #[async_trait]
//! impl HttpClient for HyperClient {
//! async fn send(&self, req: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
//! async fn send_bytes(&self, req: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {
//! let resp = self
//! .0
//! .request(req.map(|v| Full::new(Bytes::from(v))))
//! .request(req.map(|v| Full::new(v)))
//! .await?;
//!
//! let response = Response::builder()
Expand Down
Loading