diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index b90970dae8..320db6ebd5 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -149,6 +149,7 @@ const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; const ENV_TRACE_SERVICE_NAME: &str = "LINKERD2_PROXY_TRACE_SERVICE_NAME"; +const ENV_TRACE_EXTRA_ATTRIBUTES: &str = "LINKERD2_PROXY_TRACE_EXTRA_ATTRIBUTES"; /// Constrains which destination names may be used for profile/route discovery. /// @@ -432,6 +433,7 @@ pub fn parse_config(strings: &S) -> Result let hostname = strings.get(ENV_HOSTNAME); let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_extra_attributes = strings.get(ENV_TRACE_EXTRA_ATTRIBUTES); let trace_protocol = strings.get(ENV_TRACE_PROTOCOL); let trace_service_name = strings.get(ENV_TRACE_SERVICE_NAME); @@ -831,12 +833,17 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - let attributes = trace_attributes_file_path + let mut attributes = trace_attributes_file_path .map(|path| match path.and_then(|p| p.parse::().ok()) { Some(path) => trace::read_trace_attributes(&path), None => HashMap::new(), }) .unwrap_or_default(); + if let Ok(Some(attrs)) = trace_extra_attributes { + if !attrs.is_empty() { + attributes.extend(trace::parse_env_trace_attributes(&attrs)); + } + } let trace_protocol = trace_protocol .map(|proto| proto.and_then(|p| p.parse::().ok())) diff --git a/linkerd/app/src/env/trace.rs b/linkerd/app/src/env/trace.rs index 98b9c4e3f0..ca1eab54a8 100644 --- a/linkerd/app/src/env/trace.rs +++ b/linkerd/app/src/env/trace.rs @@ -14,6 +14,10 @@ pub(super) fn read_trace_attributes(path: &std::path::Path) -> HashMap HashMap { + parse_attrs(attrs) +} + fn parse_attrs(attrs: &str) -> HashMap { attrs .lines() diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs index ea362e9921..b173848c61 100644 --- a/linkerd/app/src/trace_collector.rs +++ b/linkerd/app/src/trace_collector.rs @@ -1,11 +1,14 @@ -use linkerd_app_core::http_tracing::{CollectorProtocol, SpanSink}; -use linkerd_app_core::metrics::ControlHttp as HttpMetrics; -use linkerd_app_core::svc::NewService; -use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry}; +use linkerd_app_core::{ + control, dns, + http_tracing::{CollectorProtocol, SpanSink}, + identity, + metrics::ControlHttp as HttpMetrics, + opencensus, opentelemetry, + svc::NewService, +}; use linkerd_error::Error; -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; +use otel_collector::OtelCollectorAttributes; +use std::{collections::HashMap, future::Future, pin::Pin}; pub mod oc_collector; pub mod otel_collector; @@ -92,14 +95,19 @@ impl Config { svc, legacy_oc_metrics, ), - CollectorProtocol::OpenTelemetry => otel_collector::create_collector( - addr.clone(), - inner.hostname, - svc_name, - inner.attributes, - svc, - legacy_otel_metrics, - ), + CollectorProtocol::OpenTelemetry => { + let attributes = OtelCollectorAttributes { + hostname: inner.hostname, + service_name: svc_name, + extra: inner.attributes, + }; + otel_collector::create_collector( + addr.clone(), + attributes, + svc, + legacy_otel_metrics, + ) + } }; Ok(TraceCollector::Enabled(Box::new(collector))) diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index f503fdcf80..0541aa75d6 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -4,20 +4,29 @@ use linkerd_app_core::{ }; use linkerd_opentelemetry::{ self as opentelemetry, metrics, - proto::proto::common::v1::{any_value, AnyValue, KeyValue}, - proto::transform::common::ResourceAttributesWithSchema, + proto::{ + proto::common::v1::{any_value, AnyValue, KeyValue}, + transform::common::ResourceAttributesWithSchema, + }, +}; +use std::{ + collections::HashMap, + time::{SystemTime, UNIX_EPOCH}, }; -use std::{collections::HashMap, time::SystemTime, time::UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{body::BoxBody, client::GrpcService}; use tracing::Instrument; +pub(super) struct OtelCollectorAttributes { + pub hostname: Option, + pub service_name: String, + pub extra: HashMap, +} + pub(super) fn create_collector( addr: ControlAddr, - hostname: Option, - service_name: String, - attributes: HashMap, + attributes: OtelCollectorAttributes, svc: S, legacy_metrics: metrics::Registry, ) -> EnabledCollector @@ -36,7 +45,7 @@ where resources .attributes .0 - .push(service_name.with_key("service.name")); + .push(attributes.service_name.with_key("service.name")); resources .attributes .0 @@ -49,13 +58,16 @@ where .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) .with_key("process.start_timestamp"), ); - resources - .attributes - .0 - .push(hostname.unwrap_or_default().with_key("host.name")); + resources.attributes.0.push( + attributes + .hostname + .unwrap_or_default() + .with_key("host.name"), + ); resources.attributes.0.extend( attributes + .extra .into_iter() .map(|(key, value)| value.with_key(&key)), );