Skip to content

Commit

Permalink
Include extra trace attributes from env
Browse files Browse the repository at this point in the history
This allows us to include arbitrary values from the k8s downward API beyond just the pod labels that are included in the trace attributes file.

See linkerd/linkerd2#13544 for the corresponding control plane change.

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Jan 14, 2025
1 parent 387197e commit bedbace
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 27 deletions.
9 changes: 8 additions & 1 deletion linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str =
const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH";
const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL";
const ENV_TRACE_SERVICE_NAME: &str = "LINKERD2_PROXY_TRACE_SERVICE_NAME";
const ENV_TRACE_EXTRA_ATTRIBUTES: &str = "LINKERD2_PROXY_TRACE_EXTRA_ATTRIBUTES";

/// Constrains which destination names may be used for profile/route discovery.
///
Expand Down Expand Up @@ -432,6 +433,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let hostname = strings.get(ENV_HOSTNAME);

let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH);
let trace_extra_attributes = strings.get(ENV_TRACE_EXTRA_ATTRIBUTES);
let trace_protocol = strings.get(ENV_TRACE_PROTOCOL);
let trace_service_name = strings.get(ENV_TRACE_SERVICE_NAME);

Expand Down Expand Up @@ -831,12 +833,17 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
} else {
outbound.http_request_queue.failfast_timeout
};
let attributes = trace_attributes_file_path
let mut attributes = trace_attributes_file_path
.map(|path| match path.and_then(|p| p.parse::<PathBuf>().ok()) {
Some(path) => trace::read_trace_attributes(&path),
None => HashMap::new(),
})
.unwrap_or_default();
if let Ok(Some(attrs)) = trace_extra_attributes {
if !attrs.is_empty() {
attributes.extend(trace::parse_env_trace_attributes(&attrs));
}
}

let trace_protocol = trace_protocol
.map(|proto| proto.and_then(|p| p.parse::<CollectorProtocol>().ok()))
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/src/env/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub(super) fn read_trace_attributes(path: &std::path::Path) -> HashMap<String, S
}
}

pub(super) fn parse_env_trace_attributes(attrs: &str) -> HashMap<String, String> {
parse_attrs(attrs)
}

fn parse_attrs(attrs: &str) -> HashMap<String, String> {
attrs
.lines()
Expand Down
38 changes: 23 additions & 15 deletions linkerd/app/src/trace_collector.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use linkerd_app_core::http_tracing::{CollectorProtocol, SpanSink};
use linkerd_app_core::metrics::ControlHttp as HttpMetrics;
use linkerd_app_core::svc::NewService;
use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry};
use linkerd_app_core::{
control, dns,
http_tracing::{CollectorProtocol, SpanSink},
identity,
metrics::ControlHttp as HttpMetrics,
opencensus, opentelemetry,
svc::NewService,
};
use linkerd_error::Error;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use otel_collector::OtelCollectorAttributes;
use std::{collections::HashMap, future::Future, pin::Pin};

pub mod oc_collector;
pub mod otel_collector;
Expand Down Expand Up @@ -92,14 +95,19 @@ impl Config {
svc,
legacy_oc_metrics,
),
CollectorProtocol::OpenTelemetry => otel_collector::create_collector(
addr.clone(),
inner.hostname,
svc_name,
inner.attributes,
svc,
legacy_otel_metrics,
),
CollectorProtocol::OpenTelemetry => {
let attributes = OtelCollectorAttributes {
hostname: inner.hostname,
service_name: svc_name,
extra: inner.attributes,
};
otel_collector::create_collector(
addr.clone(),
attributes,
svc,
legacy_otel_metrics,
)
}
};

Ok(TraceCollector::Enabled(Box::new(collector)))
Expand Down
34 changes: 23 additions & 11 deletions linkerd/app/src/trace_collector/otel_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,29 @@ use linkerd_app_core::{
};
use linkerd_opentelemetry::{
self as opentelemetry, metrics,
proto::proto::common::v1::{any_value, AnyValue, KeyValue},
proto::transform::common::ResourceAttributesWithSchema,
proto::{
proto::common::v1::{any_value, AnyValue, KeyValue},
transform::common::ResourceAttributesWithSchema,
},
};
use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use std::{collections::HashMap, time::SystemTime, time::UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{body::BoxBody, client::GrpcService};
use tracing::Instrument;

pub(super) struct OtelCollectorAttributes {
pub hostname: Option<String>,
pub service_name: String,
pub extra: HashMap<String, String>,
}

pub(super) fn create_collector<S>(
addr: ControlAddr,
hostname: Option<String>,
service_name: String,
attributes: HashMap<String, String>,
attributes: OtelCollectorAttributes,
svc: S,
legacy_metrics: metrics::Registry,
) -> EnabledCollector
Expand All @@ -36,7 +45,7 @@ where
resources
.attributes
.0
.push(service_name.with_key("service.name"));
.push(attributes.service_name.with_key("service.name"));
resources
.attributes
.0
Expand All @@ -49,13 +58,16 @@ where
.unwrap_or_else(|e| -(e.duration().as_secs() as i64))
.with_key("process.start_timestamp"),
);
resources
.attributes
.0
.push(hostname.unwrap_or_default().with_key("host.name"));
resources.attributes.0.push(
attributes
.hostname
.unwrap_or_default()
.with_key("host.name"),
);

resources.attributes.0.extend(
attributes
.extra
.into_iter()
.map(|(key, value)| value.with_key(&key)),
);
Expand Down

0 comments on commit bedbace

Please sign in to comment.