feat(policy-controller): add grpc server metrics (#14122)

Our Go controllers provide grpc server metrics via
grpc-ecosystem/go-grpc-prometheus:

- grpc_server_started counter
- grpc_server_handled counter
- grpc_server_handling_seconds histogram
- grpc_server_msg_received counter
- grpc_server_msg_sent counter

This commit adds identical metrics to the policy controller's Rust gRPC server.

The handling time histogram is tuned to for the client's default configuration.
This commit is contained in:
Oliver Gould 2025-06-16 08:54:57 -07:00 committed by GitHub
parent 7e5943a294
commit 334aecd0bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 413 additions and 66 deletions

View File

@ -1364,6 +1364,7 @@ dependencies = [
"linkerd-policy-controller-core", "linkerd-policy-controller-core",
"linkerd2-proxy-api", "linkerd2-proxy-api",
"maplit", "maplit",
"prometheus-client",
"prost-types", "prost-types",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -13,10 +13,11 @@ futures = { version = "0.3", default-features = false }
http = { workspace = true } http = { workspace = true }
hyper = { workspace = true, features = ["http2", "server"] } hyper = { workspace = true, features = ["http2", "server"] }
maplit = "1" maplit = "1"
prometheus-client = { workspace = true }
prost-types = "0.13" prost-types = "0.13"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
tokio = { version = "1", features = ["macros"] } tokio = { version = "1", features = ["macros", "time"] }
tonic = { workspace = true } tonic = { workspace = true }
tracing = "0.1" tracing = "0.1"

View File

@ -1,4 +1,7 @@
use crate::workload::Workload; use crate::{
metrics::{self, GrpcServerMetricsFamily, GrpcServerRPCMetrics},
workload::Workload,
};
use futures::prelude::*; use futures::prelude::*;
use linkerd2_proxy_api::{ use linkerd2_proxy_api::{
self as api, self as api,
@ -27,6 +30,8 @@ pub struct InboundPolicyServer<T> {
discover: T, discover: T,
drain: drain::Watch, drain: drain::Watch,
cluster_networks: Arc<[IpNet]>, cluster_networks: Arc<[IpNet]>,
get_metrics: GrpcServerRPCMetrics,
watch_metrics: GrpcServerRPCMetrics,
} }
// === impl InboundPolicyServer === // === impl InboundPolicyServer ===
@ -35,11 +40,22 @@ impl<T> InboundPolicyServer<T>
where where
T: DiscoverInboundServer<(Workload, NonZeroU16)> + Send + Sync + 'static, T: DiscoverInboundServer<(Workload, NonZeroU16)> + Send + Sync + 'static,
{ {
pub fn new(discover: T, cluster_networks: Vec<IpNet>, drain: drain::Watch) -> Self { pub fn new(
discover: T,
cluster_networks: Vec<IpNet>,
drain: drain::Watch,
metrics: GrpcServerMetricsFamily,
) -> Self {
const SERVICE: &str = "io.linkerd.proxy.inbound.InboundServerPolicies";
let get_metrics = metrics.unary_rpc(SERVICE, "GetPort");
let watch_metrics = metrics.server_stream_rpc(SERVICE, "WatchPort");
Self { Self {
discover, discover,
drain, drain,
cluster_networks: cluster_networks.into(), cluster_networks: cluster_networks.into(),
get_metrics,
watch_metrics,
} }
} }
@ -70,18 +86,31 @@ where
&self, &self,
req: tonic::Request<proto::PortSpec>, req: tonic::Request<proto::PortSpec>,
) -> Result<tonic::Response<proto::Server>, tonic::Status> { ) -> Result<tonic::Response<proto::Server>, tonic::Status> {
let target = self.check_target(req.into_inner())?; let metrics = self.get_metrics.start();
let target = match self.check_target(req.into_inner()) {
Ok(target) => target,
Err(status) => {
metrics.end(status.code());
return Err(status);
}
};
// Lookup the configuration for an inbound port. If the pod hasn't (yet) match self.discover.get_inbound_server(target).await {
// been indexed, return a Not Found error. Ok(Some(server)) => Ok(tonic::Response::new(to_server(
let s = self &server,
.discover &self.cluster_networks,
.get_inbound_server(target) ))),
.await Ok(None) => {
.map_err(|e| tonic::Status::internal(format!("lookup failed: {}", e)))? let status = tonic::Status::not_found("unknown server");
.ok_or_else(|| tonic::Status::not_found("unknown server"))?; metrics.end(status.code());
Err(status)
Ok(tonic::Response::new(to_server(&s, &self.cluster_networks))) }
Err(error) => {
let status = tonic::Status::internal(format!("lookup failed: {error}"));
metrics.end(status.code());
Err(status)
}
}
} }
type WatchPortStream = BoxWatchStream; type WatchPortStream = BoxWatchStream;
@ -90,19 +119,32 @@ where
&self, &self,
req: tonic::Request<proto::PortSpec>, req: tonic::Request<proto::PortSpec>,
) -> Result<tonic::Response<BoxWatchStream>, tonic::Status> { ) -> Result<tonic::Response<BoxWatchStream>, tonic::Status> {
let target = self.check_target(req.into_inner())?; let metrics = self.watch_metrics.start();
let target = match self.check_target(req.into_inner()) {
Ok(target) => target,
Err(status) => {
metrics.end(status.code());
return Err(status);
}
};
let drain = self.drain.clone(); let drain = self.drain.clone();
let rx = self match self.discover.watch_inbound_server(target).await {
.discover Ok(Some(rx)) => {
.watch_inbound_server(target) let stream = response_stream(drain, rx, self.cluster_networks.clone(), metrics);
.await Ok(tonic::Response::new(stream))
.map_err(|e| tonic::Status::internal(format!("lookup failed: {}", e)))? }
.ok_or_else(|| tonic::Status::not_found("unknown server"))?; Ok(None) => {
Ok(tonic::Response::new(response_stream( let status = tonic::Status::not_found("unknown server");
drain, metrics.end(status.code());
rx, Err(status)
self.cluster_networks.clone(), }
))) Err(error) => {
let status = tonic::Status::internal(format!("lookup failed: {error}"));
metrics.end(status.code());
Err(status)
}
}
} }
} }
@ -113,6 +155,7 @@ fn response_stream(
drain: drain::Watch, drain: drain::Watch,
mut rx: InboundServerStream, mut rx: InboundServerStream,
cluster_networks: Arc<[IpNet]>, cluster_networks: Arc<[IpNet]>,
metrics: metrics::ResponseObserver,
) -> BoxWatchStream { ) -> BoxWatchStream {
Box::pin(async_stream::try_stream! { Box::pin(async_stream::try_stream! {
tokio::pin! { tokio::pin! {
@ -124,18 +167,19 @@ fn response_stream(
// When the port is updated with a new server, update the server watch. // When the port is updated with a new server, update the server watch.
res = rx.next() => match res { res = rx.next() => match res {
Some(s) => { Some(s) => {
metrics.msg_sent();
yield to_server(&s, &cluster_networks); yield to_server(&s, &cluster_networks);
} }
None => return, None => break,
}, },
// If the server starts shutting down, close the stream so that it doesn't hold the // If the server starts shutting down, close the stream so that it doesn't hold the
// server open. // server open.
_ = (&mut shutdown) => { _ = (&mut shutdown) => break,
return;
}
} }
} }
metrics.end(tonic::Code::Ok);
}) })
} }

View File

@ -4,5 +4,6 @@
mod routes; mod routes;
pub mod inbound; pub mod inbound;
pub mod metrics;
pub mod outbound; pub mod outbound;
pub mod workload; pub mod workload;

View File

@ -0,0 +1,247 @@
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::{
metrics::{counter::Counter, family::Family},
registry::Registry,
};
use tokio::time;
#[derive(Clone, Debug)]
pub struct GrpcServerMetricsFamily {
started: Family<Labels, Counter>,
handling: Family<Labels, Histogram>,
handled: Family<CodeLabels, Counter>,
msg_received: Family<Labels, Counter>,
msg_sent: Family<Labels, Counter>,
}
#[derive(Clone, Debug)]
pub(crate) struct GrpcServerRPCMetrics {
started: Counter,
msg_received: Counter,
msg_sent: Counter,
handling: Histogram,
handled: Family<CodeLabels, Counter>,
labels: Labels,
}
pub(crate) struct ResponseObserver {
msg_sent: Counter,
handled: Option<ResponseHandle>,
}
struct ResponseHandle {
start: time::Instant,
durations: Histogram,
codes: Family<CodeLabels, Counter>,
labels: Labels,
}
#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)]
struct Labels {
grpc_service: &'static str,
grpc_method: &'static str,
grpc_type: &'static str,
}
#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)]
struct CodeLabels {
grpc_service: &'static str,
grpc_method: &'static str,
grpc_type: &'static str,
grpc_code: &'static str,
}
// === GrpcServerMetricsFamily ===
impl GrpcServerMetricsFamily {
pub fn register(reg: &mut Registry) -> Self {
let started = Family::<Labels, Counter>::default();
reg.register(
"started",
"Total number of RPCs started on the server",
started.clone(),
);
let msg_received = Family::<Labels, Counter>::default();
reg.register(
"msg_received",
"Total number of RPC stream messages received on the server",
msg_received.clone(),
);
let msg_sent = Family::<Labels, Counter>::default();
reg.register(
"msg_sent",
"Total number of gRPC stream messages sent by the server",
msg_sent.clone(),
);
let handled = Family::<CodeLabels, Counter>::default();
reg.register(
"handled",
"Total number of RPCs completed on the server, regardless of success or failure",
handled.clone(),
);
let handling = Family::<Labels, Histogram>::new_with_constructor(|| {
// Our default client configuration has a 5m idle timeout and a 1h
// max lifetime.
Histogram::new([0.1, 1.0, 300.0, 3600.0])
});
reg.register_with_unit(
"handling",
"Histogram of response latency (seconds) of gRPC that had been application-level handled by the server",
prometheus_client::registry::Unit::Seconds,
handling.clone(),
);
Self {
started,
msg_received,
msg_sent,
handled,
handling,
}
}
pub(crate) fn unary_rpc(
&self,
svc: &'static str,
method: &'static str,
) -> GrpcServerRPCMetrics {
self.rpc(svc, method, "unary")
}
pub(crate) fn server_stream_rpc(
&self,
svc: &'static str,
method: &'static str,
) -> GrpcServerRPCMetrics {
self.rpc(svc, method, "server_stream")
}
fn rpc(
&self,
grpc_service: &'static str,
grpc_method: &'static str,
grpc_type: &'static str,
) -> GrpcServerRPCMetrics {
let labels = Labels {
grpc_service,
grpc_method,
grpc_type,
};
GrpcServerRPCMetrics {
started: self.started.get_or_create(&labels).clone(),
msg_received: self.msg_received.get_or_create(&labels).clone(),
msg_sent: self.msg_sent.get_or_create(&labels).clone(),
handled: self.handled.clone(),
handling: self.handling.get_or_create(&labels).clone(),
labels,
}
}
}
// === GrpcServerRPCMetrics ===
impl GrpcServerRPCMetrics {
pub(crate) fn start(&self) -> ResponseObserver {
self.started.inc();
// All of our interfaces are unary or server-streaming, so we can
// assume that if we receive a request, we received a single message.
self.msg_received.inc();
let handled = {
// Pre-register OK
let _ = self.handled.get_or_create(&CodeLabels {
grpc_service: self.labels.grpc_service,
grpc_method: self.labels.grpc_method,
grpc_type: self.labels.grpc_type,
grpc_code: code_str(tonic::Code::Ok),
});
Some(ResponseHandle {
start: time::Instant::now(),
durations: self.handling.clone(),
codes: self.handled.clone(),
labels: self.labels.clone(),
})
};
ResponseObserver {
msg_sent: self.msg_sent.clone(),
handled,
}
}
}
// === ResponseObserver ===
impl ResponseObserver {
pub(crate) fn msg_sent(&self) {
self.msg_sent.inc();
}
pub(crate) fn end(mut self, code: tonic::Code) {
self.handled
.take()
.expect("handle must be set")
.inc_end(code);
}
}
impl Drop for ResponseObserver {
fn drop(&mut self) {
if let Some(inner) = self.handled.take() {
inner.inc_end(tonic::Code::Ok);
}
}
}
// === ResponseHandle ===
impl ResponseHandle {
#[inline]
fn inc_end(self, code: tonic::Code) {
let Self {
start,
durations,
codes,
labels,
} = self;
durations.observe(start.elapsed().as_secs_f64());
codes
.get_or_create(&CodeLabels {
grpc_service: labels.grpc_service,
grpc_method: labels.grpc_method,
grpc_type: labels.grpc_type,
grpc_code: code_str(code),
})
.inc();
}
}
fn code_str(code: tonic::Code) -> &'static str {
use tonic::Code::*;
match code {
Ok => "OK",
Cancelled => "CANCELLED",
Unknown => "UNKNOWN",
InvalidArgument => "INVALID_ARGUMENT",
DeadlineExceeded => "DEADLINE_EXCEEDED",
NotFound => "NOT_FOUND",
AlreadyExists => "ALREADY_EXISTS",
PermissionDenied => "PERMISSION_DENIED",
ResourceExhausted => "RESOURCE_EXHAUSTED",
FailedPrecondition => "FAILED_PRECONDITION",
Aborted => "ABORTED",
OutOfRange => "OUT_OF_RANGE",
Unimplemented => "UNIMPLEMENTED",
Internal => "INTERNAL",
Unavailable => "UNAVAILABLE",
DataLoss => "DATA_LOSS",
Unauthenticated => "UNAUTHENTICATED",
}
}

View File

@ -1,5 +1,6 @@
extern crate http as http_crate; extern crate http as http_crate;
use crate::metrics::{self, GrpcServerMetricsFamily, GrpcServerRPCMetrics};
use crate::workload; use crate::workload;
use futures::{prelude::*, StreamExt}; use futures::{prelude::*, StreamExt};
use http_crate::uri::Authority; use http_crate::uri::Authority;
@ -33,6 +34,8 @@ pub struct OutboundPolicyServer<T> {
cluster_domain: Arc<str>, cluster_domain: Arc<str>,
allow_l5d_request_headers: bool, allow_l5d_request_headers: bool,
drain: drain::Watch, drain: drain::Watch,
get_metrics: GrpcServerRPCMetrics,
watch_metrics: GrpcServerRPCMetrics,
} }
impl<T> OutboundPolicyServer<T> impl<T> OutboundPolicyServer<T>
@ -44,12 +47,19 @@ where
cluster_domain: impl Into<Arc<str>>, cluster_domain: impl Into<Arc<str>>,
allow_l5d_request_headers: bool, allow_l5d_request_headers: bool,
drain: drain::Watch, drain: drain::Watch,
metrics: GrpcServerMetricsFamily,
) -> Self { ) -> Self {
const SERVICE: &str = "io.linkerd.proxy.outbound.OutboundPolicies";
let get_metrics = metrics.unary_rpc(SERVICE, "Get");
let watch_metrics = metrics.server_stream_rpc(SERVICE, "Watch");
Self { Self {
index: discover, index: discover,
cluster_domain: cluster_domain.into(), cluster_domain: cluster_domain.into(),
allow_l5d_request_headers, allow_l5d_request_headers,
drain, drain,
get_metrics,
watch_metrics,
} }
} }
@ -149,27 +159,36 @@ where
&self, &self,
req: tonic::Request<outbound::TrafficSpec>, req: tonic::Request<outbound::TrafficSpec>,
) -> Result<tonic::Response<outbound::OutboundPolicy>, tonic::Status> { ) -> Result<tonic::Response<outbound::OutboundPolicy>, tonic::Status> {
let target = self.lookup(req.into_inner())?; let metrics = self.get_metrics.start();
let target = match self.lookup(req.into_inner()) {
Ok(target) => target,
Err(status) => {
metrics.end(status.code());
return Err(status);
}
};
match target.clone() { match target {
OutboundDiscoverTarget::Resource(resource) => { OutboundDiscoverTarget::Resource(resource) => {
let original_dst = resource.original_dst(); let original_dst = resource.original_dst();
let policy = self match self.index.get_outbound_policy(resource).await {
.index Ok(Some(policy)) => Ok(tonic::Response::new(to_proto(
.get_outbound_policy(resource)
.await
.map_err(|error| {
tonic::Status::internal(format!("failed to get outbound policy: {error}"))
})?;
if let Some(policy) = policy {
Ok(tonic::Response::new(to_proto(
policy, policy,
self.allow_l5d_request_headers, self.allow_l5d_request_headers,
original_dst, original_dst,
))) ))),
} else { Ok(None) => {
Err(tonic::Status::not_found("No such policy")) let status = tonic::Status::not_found("unknown target");
metrics.end(status.code());
return Err(status);
}
Err(error) => {
let status = tonic::Status::internal(format!(
"failed to get outbound policy: {error}"
));
metrics.end(status.code());
return Err(status);
}
} }
} }
@ -185,23 +204,41 @@ where
&self, &self,
req: tonic::Request<outbound::TrafficSpec>, req: tonic::Request<outbound::TrafficSpec>,
) -> Result<tonic::Response<BoxWatchStream>, tonic::Status> { ) -> Result<tonic::Response<BoxWatchStream>, tonic::Status> {
let target = self.lookup(req.into_inner())?; let metrics = self.watch_metrics.start();
let drain = self.drain.clone(); let target = match self.lookup(req.into_inner()) {
Ok(target) => target,
Err(status) => {
metrics.end(status.code());
return Err(status);
}
};
match target.clone() { let drain = self.drain.clone();
match target {
OutboundDiscoverTarget::Resource(resource) => { OutboundDiscoverTarget::Resource(resource) => {
let original_dst = resource.original_dst(); let original_dst = resource.original_dst();
let rx = self let rx = match self.index.watch_outbound_policy(resource).await {
.index Ok(Some(rx)) => rx,
.watch_outbound_policy(resource) Ok(None) => {
.await let status = tonic::Status::not_found("unknown target");
.map_err(|e| tonic::Status::internal(format!("lookup failed: {e}")))? metrics.end(status.code());
.ok_or_else(|| tonic::Status::not_found("unknown server"))?; return Err(status);
}
Err(error) => {
let status = tonic::Status::internal(format!(
"failed to get outbound policy: {error}"
));
metrics.end(status.code());
return Err(status);
}
};
Ok(tonic::Response::new(response_stream( Ok(tonic::Response::new(response_stream(
drain, drain,
rx, rx,
self.allow_l5d_request_headers, self.allow_l5d_request_headers,
original_dst, original_dst,
metrics,
))) )))
} }
@ -211,6 +248,7 @@ where
drain, drain,
rx, rx,
original_dst, original_dst,
metrics,
))) )))
} }
} }
@ -226,6 +264,7 @@ fn response_stream(
mut rx: OutboundPolicyStream, mut rx: OutboundPolicyStream,
allow_l5d_request_headers: bool, allow_l5d_request_headers: bool,
original_dst: Option<SocketAddr>, original_dst: Option<SocketAddr>,
metrics: metrics::ResponseObserver,
) -> BoxWatchStream { ) -> BoxWatchStream {
Box::pin(async_stream::try_stream! { Box::pin(async_stream::try_stream! {
tokio::pin! { tokio::pin! {
@ -237,18 +276,19 @@ fn response_stream(
// When the port is updated with a new server, update the server watch. // When the port is updated with a new server, update the server watch.
res = rx.next() => match res { res = rx.next() => match res {
Some(policy) => { Some(policy) => {
metrics.msg_sent();
yield to_proto(policy, allow_l5d_request_headers, original_dst); yield to_proto(policy, allow_l5d_request_headers, original_dst);
} }
None => return, None => break,
}, },
// If the server starts shutting down, close the stream so that it doesn't hold the // If the server starts shutting down, close the stream so that it doesn't hold the
// server open. // server open.
_ = &mut shutdown => { _ = &mut shutdown => break,
return;
}
} }
} }
metrics.end(tonic::Code::Ok);
}) })
} }
@ -256,6 +296,7 @@ fn external_stream(
drain: drain::Watch, drain: drain::Watch,
mut rx: ExternalPolicyStream, mut rx: ExternalPolicyStream,
original_dst: SocketAddr, original_dst: SocketAddr,
metrics: metrics::ResponseObserver,
) -> BoxWatchStream { ) -> BoxWatchStream {
Box::pin(async_stream::try_stream! { Box::pin(async_stream::try_stream! {
tokio::pin! { tokio::pin! {
@ -266,18 +307,19 @@ fn external_stream(
tokio::select! { tokio::select! {
res = rx.next() => match res { res = rx.next() => match res {
Some(_) => { Some(_) => {
metrics.msg_sent();
yield fallback(original_dst); yield fallback(original_dst);
} }
None => return, None => break,
}, },
// If the server starts shutting down, close the stream so that it doesn't hold the // If the server starts shutting down, close the stream so that it doesn't hold the
// server open. // server open.
_ = &mut shutdown => { _ = &mut shutdown => break,
return;
}
} }
} }
metrics.end(tonic::Code::Ok);
}) })
} }

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
admission::Admission, admission::Admission,
core::IpNet, core::IpNet,
grpc, grpc::{self, metrics::GrpcServerMetricsFamily},
index::{self, ports::parse_portset, ClusterInfo, DefaultPolicy}, index::{self, ports::parse_portset, ClusterInfo, DefaultPolicy},
index_list::IndexList, index_list::IndexList,
k8s::{self, gateway, Client, Resource}, k8s::{self, gateway, Client, Resource},
@ -163,6 +163,9 @@ impl Args {
inbound_index.clone(), inbound_index.clone(),
); );
let rt_metrics = kubert::RuntimeMetrics::register(prom.sub_registry_with_prefix("kube")); let rt_metrics = kubert::RuntimeMetrics::register(prom.sub_registry_with_prefix("kube"));
let grpc_metrics = grpc::metrics::GrpcServerMetricsFamily::register(
prom.sub_registry_with_prefix("grpc_server"),
);
let mut runtime = kubert::Runtime::builder() let mut runtime = kubert::Runtime::builder()
.with_log(log_level, log_format) .with_log(log_level, log_format)
@ -369,6 +372,7 @@ impl Args {
allow_l5d_request_headers, allow_l5d_request_headers,
inbound_index, inbound_index,
outbound_index, outbound_index,
grpc_metrics.clone(),
runtime.shutdown_handle(), runtime.shutdown_handle(),
)); ));
@ -413,6 +417,7 @@ impl std::str::FromStr for IpNets {
} }
#[instrument(skip_all, fields(port = %addr.port()))] #[instrument(skip_all, fields(port = %addr.port()))]
#[allow(clippy::too_many_arguments)]
async fn grpc( async fn grpc(
addr: SocketAddr, addr: SocketAddr,
cluster_domain: String, cluster_domain: String,
@ -420,12 +425,17 @@ async fn grpc(
allow_l5d_request_headers: bool, allow_l5d_request_headers: bool,
inbound_index: index::inbound::SharedIndex, inbound_index: index::inbound::SharedIndex,
outbound_index: index::outbound::SharedIndex, outbound_index: index::outbound::SharedIndex,
metrics: GrpcServerMetricsFamily,
drain: drain::Watch, drain: drain::Watch,
) -> Result<()> { ) -> Result<()> {
let inbound_discover = InboundDiscover::new(inbound_index); let inbound_discover = InboundDiscover::new(inbound_index);
let inbound_svc = let inbound_svc = grpc::inbound::InboundPolicyServer::new(
grpc::inbound::InboundPolicyServer::new(inbound_discover, cluster_networks, drain.clone()) inbound_discover,
.svc(); cluster_networks,
drain.clone(),
metrics.clone(),
)
.svc();
let outbound_discover = OutboundDiscover::new(outbound_index); let outbound_discover = OutboundDiscover::new(outbound_index);
let outbound_svc = grpc::outbound::OutboundPolicyServer::new( let outbound_svc = grpc::outbound::OutboundPolicyServer::new(
@ -433,6 +443,7 @@ async fn grpc(
cluster_domain, cluster_domain,
allow_l5d_request_headers, allow_l5d_request_headers,
drain.clone(), drain.clone(),
metrics,
) )
.svc(); .svc();