feat(policy): Allow outbound hostname metrics (#3770)

Outbound hostname metrics were recently disabled. This conditionally re-enables those through a `LINKERD2_PROXY_OUTBOUND_METRICS_HOSTNAME_LABELS` env var, wired through the policy/routing config with the option of individual policies and routes to set this separately from the global config.

Signed-off-by: Scott Fleener <scott@buoyant.io>
This commit is contained in:
Scott Fleener 2025-03-18 09:27:54 -04:00 committed by GitHub
parent 123d7a344e
commit 65db3dd927
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 100 additions and 62 deletions

View File

@ -6,11 +6,11 @@
#![allow(opaque_hidden_inferred_bound)] #![allow(opaque_hidden_inferred_bound)]
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
use linkerd_app_core::http_tracing::SpanSink;
use linkerd_app_core::{ use linkerd_app_core::{
config::{ProxyConfig, QueueConfig}, config::{ProxyConfig, QueueConfig},
drain, drain,
exp_backoff::ExponentialBackoff, exp_backoff::ExponentialBackoff,
http_tracing::SpanSink,
identity, io, identity, io,
metrics::prom, metrics::prom,
profiles, profiles,
@ -143,6 +143,7 @@ impl Outbound<()> {
client: C, client: C,
backoff: ExponentialBackoff, backoff: ExponentialBackoff,
limits: ReceiveLimits, limits: ReceiveLimits,
export_hostname_labels: bool,
) -> impl policy::GetPolicy ) -> impl policy::GetPolicy
where where
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>, C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
@ -151,12 +152,18 @@ impl Outbound<()> {
C::ResponseBody: Send + 'static, C::ResponseBody: Send + 'static,
C::Future: Send, C::Future: Send,
{ {
policy::Api::new(workload, limits, Duration::from_secs(10), client) policy::Api::new(
.into_watch(backoff) workload,
.map_result(|response| match response { limits,
Err(e) => Err(e.into()), Duration::from_secs(10),
Ok(rsp) => Ok(rsp.into_inner()), export_hostname_labels,
}) client,
)
.into_watch(backoff)
.map_result(|res| match res {
Err(e) => Err(e.into()),
Ok(rsp) => Ok(rsp.into_inner()),
})
} }
#[cfg(any(test, feature = "test-util"))] #[cfg(any(test, feature = "test-util"))]

View File

@ -8,7 +8,7 @@ use linkerd_app_core::{
svc::Service, svc::Service,
Addr, Error, Recover, Result, Addr, Error, Recover, Result,
}; };
use linkerd_proxy_client_policy::ClientPolicy; use linkerd_proxy_client_policy::{ClientPolicy, ClientPolicyOverrides};
use linkerd_tonic_stream::{LimitReceiveFuture, ReceiveLimits}; use linkerd_tonic_stream::{LimitReceiveFuture, ReceiveLimits};
use linkerd_tonic_watch::StreamWatch; use linkerd_tonic_watch::StreamWatch;
use std::sync::Arc; use std::sync::Arc;
@ -19,6 +19,7 @@ pub(crate) struct Api<S> {
workload: Arc<str>, workload: Arc<str>,
limits: ReceiveLimits, limits: ReceiveLimits,
default_detect_timeout: time::Duration, default_detect_timeout: time::Duration,
export_hostname_labels: bool,
client: Client<S>, client: Client<S>,
} }
@ -39,12 +40,14 @@ where
workload: Arc<str>, workload: Arc<str>,
limits: ReceiveLimits, limits: ReceiveLimits,
default_detect_timeout: time::Duration, default_detect_timeout: time::Duration,
export_hostname_labels: bool,
client: S, client: S,
) -> Self { ) -> Self {
Self { Self {
workload, workload,
limits, limits,
default_detect_timeout, default_detect_timeout,
export_hostname_labels,
client: Client::new(client), client: Client::new(client),
} }
} }
@ -86,6 +89,9 @@ where
}; };
let detect_timeout = self.default_detect_timeout; let detect_timeout = self.default_detect_timeout;
let overrides = ClientPolicyOverrides {
export_hostname_labels: self.export_hostname_labels,
};
let limits = self.limits; let limits = self.limits;
let mut client = self.client.clone(); let mut client = self.client.clone();
Box::pin(async move { Box::pin(async move {
@ -96,7 +102,7 @@ where
// If the server returned an invalid client policy, we // If the server returned an invalid client policy, we
// default to using an invalid policy that causes all // default to using an invalid policy that causes all
// requests to report an internal error. // requests to report an internal error.
let policy = ClientPolicy::try_from(up).unwrap_or_else(|error| { let policy = ClientPolicy::try_from(overrides, up).unwrap_or_else(|error| {
tracing::warn!(%error, "Client policy misconfigured"); tracing::warn!(%error, "Client policy misconfigured");
INVALID_POLICY INVALID_POLICY
.get_or_init(|| ClientPolicy::invalid(detect_timeout)) .get_or_init(|| ClientPolicy::invalid(detect_timeout))

View File

@ -146,6 +146,9 @@ pub const ENV_OUTBOUND_MAX_IN_FLIGHT: &str = "LINKERD2_PROXY_OUTBOUND_MAX_IN_FLI
const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str =
"LINKERD2_PROXY_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS"; "LINKERD2_PROXY_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS";
const ENV_OUTBOUND_METRICS_HOSTNAME_LABELS: &str =
"LINKERD2_PROXY_OUTBOUND_METRICS_HOSTNAME_LABELS";
const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH";
const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL";
const ENV_TRACE_SERVICE_NAME: &str = "LINKERD2_PROXY_TRACE_SERVICE_NAME"; const ENV_TRACE_SERVICE_NAME: &str = "LINKERD2_PROXY_TRACE_SERVICE_NAME";
@ -791,11 +794,14 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
}, },
} }
}; };
let export_hostname_labels =
parse(strings, ENV_OUTBOUND_METRICS_HOSTNAME_LABELS, parse_bool)?.unwrap_or(false);
policy::Config { policy::Config {
control, control,
workload, workload,
limits, limits,
export_hostname_labels,
} }
}; };

View File

@ -174,6 +174,7 @@ impl Config {
}?; }?;
debug!("Building Policy client"); debug!("Building Policy client");
let export_hostname_labels = policy.export_hostname_labels;
let policies = { let policies = {
let control_metrics = let control_metrics =
ControlMetrics::register(registry.sub_registry_with_prefix("control_policy")); ControlMetrics::register(registry.sub_registry_with_prefix("control_policy"));
@ -243,6 +244,7 @@ impl Config {
policies.client.clone(), policies.client.clone(),
policies.backoff, policies.backoff,
policies.limits, policies.limits,
export_hostname_labels,
); );
let dst_addr = dst.addr.clone(); let dst_addr = dst.addr.clone();

View File

@ -15,6 +15,7 @@ pub struct Config {
pub control: control::Config, pub control: control::Config,
pub workload: String, pub workload: String,
pub limits: ReceiveLimits, pub limits: ReceiveLimits,
pub export_hostname_labels: bool,
} }
/// Handles to policy service clients. /// Handles to policy service clients.

View File

@ -105,7 +105,7 @@ pub mod proto {
proto::{ proto::{
BackendSet, InvalidBackend, InvalidDistribution, InvalidFailureAccrual, InvalidMeta, BackendSet, InvalidBackend, InvalidDistribution, InvalidFailureAccrual, InvalidMeta,
}, },
Meta, RouteBackend, RouteDistribution, ClientPolicyOverrides, Meta, RouteBackend, RouteDistribution,
}; };
use linkerd2_proxy_api::outbound::{self, grpc_route}; use linkerd2_proxy_api::outbound::{self, grpc_route};
use linkerd_http_route::{ use linkerd_http_route::{
@ -184,22 +184,22 @@ pub mod proto {
Redirect(#[from] InvalidRequestRedirect), Redirect(#[from] InvalidRequestRedirect),
} }
impl TryFrom<outbound::proxy_protocol::Grpc> for Grpc { impl Grpc {
type Error = InvalidGrpcRoute; pub fn try_from(
fn try_from(proto: outbound::proxy_protocol::Grpc) -> Result<Self, Self::Error> { overrides: ClientPolicyOverrides,
proto: outbound::proxy_protocol::Grpc,
) -> Result<Self, InvalidGrpcRoute> {
let routes = proto let routes = proto
.routes .routes
.into_iter() .into_iter()
.map(try_route) .map(|p| try_route(overrides, p))
.collect::<Result<Arc<[_]>, _>>()?; .collect::<Result<Arc<[_]>, _>>()?;
Ok(Self { Ok(Self {
routes, routes,
failure_accrual: proto.failure_accrual.try_into()?, failure_accrual: proto.failure_accrual.try_into()?,
}) })
} }
}
impl Grpc {
pub fn fill_backends(&self, set: &mut BackendSet) { pub fn fill_backends(&self, set: &mut BackendSet) {
for Route { ref rules, .. } in &*self.routes { for Route { ref rules, .. } in &*self.routes {
for Rule { ref policy, .. } in rules { for Rule { ref policy, .. } in rules {
@ -209,7 +209,10 @@ pub mod proto {
} }
} }
fn try_route(proto: outbound::GrpcRoute) -> Result<Route, InvalidGrpcRoute> { fn try_route(
overrides: ClientPolicyOverrides,
proto: outbound::GrpcRoute,
) -> Result<Route, InvalidGrpcRoute> {
let outbound::GrpcRoute { let outbound::GrpcRoute {
hosts, hosts,
rules, rules,
@ -227,7 +230,7 @@ pub mod proto {
let rules = rules let rules = rules
.into_iter() .into_iter()
.map(|rule| try_rule(&meta, rule)) .map(|rule| try_rule(&meta, overrides, rule))
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
Ok(Route { hosts, rules }) Ok(Route { hosts, rules })
@ -235,6 +238,7 @@ pub mod proto {
fn try_rule( fn try_rule(
meta: &Arc<Meta>, meta: &Arc<Meta>,
overrides: ClientPolicyOverrides,
proto: outbound::grpc_route::Rule, proto: outbound::grpc_route::Rule,
) -> Result<Rule, InvalidGrpcRoute> { ) -> Result<Rule, InvalidGrpcRoute> {
#[allow(deprecated)] #[allow(deprecated)]
@ -262,13 +266,8 @@ pub mod proto {
.ok_or(InvalidGrpcRoute::Missing("distribution"))? .ok_or(InvalidGrpcRoute::Missing("distribution"))?
.try_into()?; .try_into()?;
let export_hostname_labels = false; let mut params =
let mut params = RouteParams::try_from_proto( RouteParams::try_from_proto(timeouts, retry, allow_l5d_request_headers, overrides)?;
timeouts,
retry,
allow_l5d_request_headers,
export_hostname_labels,
)?;
let legacy = request_timeout.map(TryInto::try_into).transpose()?; let legacy = request_timeout.map(TryInto::try_into).transpose()?;
params.timeouts.request = params.timeouts.request.or(legacy); params.timeouts.request = params.timeouts.request.or(legacy);
@ -288,7 +287,7 @@ pub mod proto {
timeouts: Option<linkerd2_proxy_api::http_route::Timeouts>, timeouts: Option<linkerd2_proxy_api::http_route::Timeouts>,
retry: Option<grpc_route::Retry>, retry: Option<grpc_route::Retry>,
allow_l5d_request_headers: bool, allow_l5d_request_headers: bool,
export_hostname_labels: bool, overrides: ClientPolicyOverrides,
) -> Result<Self, InvalidGrpcRoute> { ) -> Result<Self, InvalidGrpcRoute> {
Ok(Self { Ok(Self {
retry: retry.map(Retry::try_from).transpose()?, retry: retry.map(Retry::try_from).transpose()?,
@ -297,7 +296,7 @@ pub mod proto {
.transpose()? .transpose()?
.unwrap_or_default(), .unwrap_or_default(),
allow_l5d_request_headers, allow_l5d_request_headers,
export_hostname_labels, export_hostname_labels: overrides.export_hostname_labels,
}) })
} }
} }

View File

@ -125,7 +125,7 @@ pub mod proto {
proto::{ proto::{
BackendSet, InvalidBackend, InvalidDistribution, InvalidFailureAccrual, InvalidMeta, BackendSet, InvalidBackend, InvalidDistribution, InvalidFailureAccrual, InvalidMeta,
}, },
Meta, RouteBackend, RouteDistribution, ClientPolicyOverrides, Meta, RouteBackend, RouteDistribution,
}; };
use linkerd2_proxy_api::outbound::{self, http_route}; use linkerd2_proxy_api::outbound::{self, http_route};
use linkerd_http_route::http::{ use linkerd_http_route::http::{
@ -217,13 +217,15 @@ pub mod proto {
} }
} }
impl TryFrom<outbound::proxy_protocol::Http1> for Http1 { impl Http1 {
type Error = InvalidHttpRoute; pub fn try_from(
fn try_from(proto: outbound::proxy_protocol::Http1) -> Result<Self, Self::Error> { overrides: ClientPolicyOverrides,
proto: outbound::proxy_protocol::Http1,
) -> Result<Self, InvalidHttpRoute> {
let routes = proto let routes = proto
.routes .routes
.into_iter() .into_iter()
.map(try_route) .map(|p| try_route(overrides, p))
.collect::<Result<Arc<[_]>, _>>()?; .collect::<Result<Arc<[_]>, _>>()?;
Ok(Self { Ok(Self {
routes, routes,
@ -232,13 +234,15 @@ pub mod proto {
} }
} }
impl TryFrom<outbound::proxy_protocol::Http2> for Http2 { impl Http2 {
type Error = InvalidHttpRoute; pub fn try_from(
fn try_from(proto: outbound::proxy_protocol::Http2) -> Result<Self, Self::Error> { overrides: ClientPolicyOverrides,
proto: outbound::proxy_protocol::Http2,
) -> Result<Self, InvalidHttpRoute> {
let routes = proto let routes = proto
.routes .routes
.into_iter() .into_iter()
.map(try_route) .map(|p| try_route(overrides, p))
.collect::<Result<Arc<[_]>, _>>()?; .collect::<Result<Arc<[_]>, _>>()?;
Ok(Self { Ok(Self {
routes, routes,
@ -247,7 +251,10 @@ pub mod proto {
} }
} }
fn try_route(proto: outbound::HttpRoute) -> Result<Route, InvalidHttpRoute> { fn try_route(
overrides: ClientPolicyOverrides,
proto: outbound::HttpRoute,
) -> Result<Route, InvalidHttpRoute> {
let outbound::HttpRoute { let outbound::HttpRoute {
hosts, hosts,
rules, rules,
@ -265,7 +272,7 @@ pub mod proto {
let rules = rules let rules = rules
.into_iter() .into_iter()
.map(|rule| try_rule(&meta, rule)) .map(|rule| try_rule(&meta, overrides, rule))
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
Ok(Route { hosts, rules }) Ok(Route { hosts, rules })
@ -273,6 +280,7 @@ pub mod proto {
fn try_rule( fn try_rule(
meta: &Arc<Meta>, meta: &Arc<Meta>,
overrides: ClientPolicyOverrides,
proto: outbound::http_route::Rule, proto: outbound::http_route::Rule,
) -> Result<Rule, InvalidHttpRoute> { ) -> Result<Rule, InvalidHttpRoute> {
#[allow(deprecated)] #[allow(deprecated)]
@ -300,13 +308,8 @@ pub mod proto {
.ok_or(InvalidHttpRoute::Missing("distribution"))? .ok_or(InvalidHttpRoute::Missing("distribution"))?
.try_into()?; .try_into()?;
let export_hostname_labels = false; let mut params =
let mut params = RouteParams::try_from_proto( RouteParams::try_from_proto(timeouts, retry, allow_l5d_request_headers, overrides)?;
timeouts,
retry,
allow_l5d_request_headers,
export_hostname_labels,
)?;
let legacy = request_timeout.map(TryInto::try_into).transpose()?; let legacy = request_timeout.map(TryInto::try_into).transpose()?;
params.timeouts.request = params.timeouts.request.or(legacy); params.timeouts.request = params.timeouts.request.or(legacy);
@ -326,7 +329,7 @@ pub mod proto {
timeouts: Option<linkerd2_proxy_api::http_route::Timeouts>, timeouts: Option<linkerd2_proxy_api::http_route::Timeouts>,
retry: Option<http_route::Retry>, retry: Option<http_route::Retry>,
allow_l5d_request_headers: bool, allow_l5d_request_headers: bool,
export_hostname_labels: bool, overrides: ClientPolicyOverrides,
) -> Result<Self, InvalidHttpRoute> { ) -> Result<Self, InvalidHttpRoute> {
Ok(Self { Ok(Self {
retry: retry.map(Retry::try_from).transpose()?, retry: retry.map(Retry::try_from).transpose()?,
@ -335,7 +338,7 @@ pub mod proto {
.transpose()? .transpose()?
.unwrap_or_default(), .unwrap_or_default(),
allow_l5d_request_headers, allow_l5d_request_headers,
export_hostname_labels, export_hostname_labels: overrides.export_hostname_labels,
}) })
} }
} }

View File

@ -19,6 +19,11 @@ pub struct ClientPolicy {
pub backends: Arc<[Backend]>, pub backends: Arc<[Backend]>,
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ClientPolicyOverrides {
pub export_hostname_labels: bool,
}
// TODO additional server configs (e.g. concurrency limits, window sizes, etc) // TODO additional server configs (e.g. concurrency limits, window sizes, etc)
#[derive(Clone, Debug, Eq, Hash, PartialEq)] #[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub enum Protocol { pub enum Protocol {
@ -430,10 +435,11 @@ pub mod proto {
Missing(&'static str), Missing(&'static str),
} }
impl TryFrom<outbound::OutboundPolicy> for ClientPolicy { impl ClientPolicy {
type Error = InvalidPolicy; pub fn try_from(
overrides: ClientPolicyOverrides,
fn try_from(policy: outbound::OutboundPolicy) -> Result<Self, Self::Error> { policy: outbound::OutboundPolicy,
) -> Result<Self, InvalidPolicy> {
use outbound::proxy_protocol; use outbound::proxy_protocol;
let parent = policy let parent = policy
@ -459,16 +465,18 @@ pub mod proto {
"Detect missing protocol detection timeout", "Detect missing protocol detection timeout",
))? ))?
.try_into()?; .try_into()?;
let http1: http::Http1 = http1 let http1 = http::Http1::try_from(
.ok_or(InvalidPolicy::Protocol( overrides,
http1.ok_or(InvalidPolicy::Protocol(
"Detect missing HTTP/1 configuration", "Detect missing HTTP/1 configuration",
))? ))?,
.try_into()?; )?;
let http2: http::Http2 = http2 let http2 = http::Http2::try_from(
.ok_or(InvalidPolicy::Protocol( overrides,
http2.ok_or(InvalidPolicy::Protocol(
"Detect missing HTTP/2 configuration", "Detect missing HTTP/2 configuration",
))? ))?,
.try_into()?; )?;
let opaque: opaq::Opaque = opaque let opaque: opaq::Opaque = opaque
.ok_or(InvalidPolicy::Protocol( .ok_or(InvalidPolicy::Protocol(
"Detect missing opaque configuration", "Detect missing opaque configuration",
@ -483,10 +491,16 @@ pub mod proto {
} }
} }
proxy_protocol::Kind::Http1(http) => Protocol::Http1(http.try_into()?), proxy_protocol::Kind::Http1(http) => {
proxy_protocol::Kind::Http2(http) => Protocol::Http2(http.try_into()?), Protocol::Http1(http::Http1::try_from(overrides, http)?)
}
proxy_protocol::Kind::Http2(http) => {
Protocol::Http2(http::Http2::try_from(overrides, http)?)
}
proxy_protocol::Kind::Opaque(opaque) => Protocol::Opaque(opaque.try_into()?), proxy_protocol::Kind::Opaque(opaque) => Protocol::Opaque(opaque.try_into()?),
proxy_protocol::Kind::Grpc(grpc) => Protocol::Grpc(grpc.try_into()?), proxy_protocol::Kind::Grpc(grpc) => {
Protocol::Grpc(grpc::Grpc::try_from(overrides, grpc)?)
}
proxy_protocol::Kind::Tls(tls) => Protocol::Tls(tls.try_into()?), proxy_protocol::Kind::Tls(tls) => Protocol::Tls(tls.try_into()?),
}; };