diff --git a/Cargo.lock b/Cargo.lock index 46aa9d527..7b1d5475e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,6 +1364,7 @@ dependencies = [ "linkerd-policy-controller-core", "linkerd2-proxy-api", "maplit", + "prometheus-client", "prost-types", "serde", "serde_json", diff --git a/policy-controller/grpc/Cargo.toml b/policy-controller/grpc/Cargo.toml index 068236f2f..2f5d7c9a8 100644 --- a/policy-controller/grpc/Cargo.toml +++ b/policy-controller/grpc/Cargo.toml @@ -13,10 +13,11 @@ futures = { version = "0.3", default-features = false } http = { workspace = true } hyper = { workspace = true, features = ["http2", "server"] } maplit = "1" +prometheus-client = { workspace = true } prost-types = "0.13" serde = { version = "1", features = ["derive"] } serde_json = "1" -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "time"] } tonic = { workspace = true } tracing = "0.1" diff --git a/policy-controller/grpc/src/inbound.rs b/policy-controller/grpc/src/inbound.rs index 889418c74..c00315226 100644 --- a/policy-controller/grpc/src/inbound.rs +++ b/policy-controller/grpc/src/inbound.rs @@ -1,4 +1,7 @@ -use crate::workload::Workload; +use crate::{ + metrics::{self, GrpcServerMetricsFamily, GrpcServerRPCMetrics}, + workload::Workload, +}; use futures::prelude::*; use linkerd2_proxy_api::{ self as api, @@ -27,6 +30,8 @@ pub struct InboundPolicyServer { discover: T, drain: drain::Watch, cluster_networks: Arc<[IpNet]>, + get_metrics: GrpcServerRPCMetrics, + watch_metrics: GrpcServerRPCMetrics, } // === impl InboundPolicyServer === @@ -35,11 +40,22 @@ impl InboundPolicyServer where T: DiscoverInboundServer<(Workload, NonZeroU16)> + Send + Sync + 'static, { - pub fn new(discover: T, cluster_networks: Vec, drain: drain::Watch) -> Self { + pub fn new( + discover: T, + cluster_networks: Vec, + drain: drain::Watch, + metrics: GrpcServerMetricsFamily, + ) -> Self { + const SERVICE: &str = "io.linkerd.proxy.inbound.InboundServerPolicies"; + let get_metrics = metrics.unary_rpc(SERVICE, "GetPort"); + let watch_metrics = metrics.server_stream_rpc(SERVICE, "WatchPort"); + Self { discover, drain, cluster_networks: cluster_networks.into(), + get_metrics, + watch_metrics, } } @@ -70,18 +86,31 @@ where &self, req: tonic::Request, ) -> Result, tonic::Status> { - let target = self.check_target(req.into_inner())?; + let metrics = self.get_metrics.start(); + let target = match self.check_target(req.into_inner()) { + Ok(target) => target, + Err(status) => { + metrics.end(status.code()); + return Err(status); + } + }; - // Lookup the configuration for an inbound port. If the pod hasn't (yet) - // been indexed, return a Not Found error. - let s = self - .discover - .get_inbound_server(target) - .await - .map_err(|e| tonic::Status::internal(format!("lookup failed: {}", e)))? - .ok_or_else(|| tonic::Status::not_found("unknown server"))?; - - Ok(tonic::Response::new(to_server(&s, &self.cluster_networks))) + match self.discover.get_inbound_server(target).await { + Ok(Some(server)) => Ok(tonic::Response::new(to_server( + &server, + &self.cluster_networks, + ))), + Ok(None) => { + let status = tonic::Status::not_found("unknown server"); + metrics.end(status.code()); + Err(status) + } + Err(error) => { + let status = tonic::Status::internal(format!("lookup failed: {error}")); + metrics.end(status.code()); + Err(status) + } + } } type WatchPortStream = BoxWatchStream; @@ -90,19 +119,32 @@ where &self, req: tonic::Request, ) -> Result, tonic::Status> { - let target = self.check_target(req.into_inner())?; + let metrics = self.watch_metrics.start(); + let target = match self.check_target(req.into_inner()) { + Ok(target) => target, + Err(status) => { + metrics.end(status.code()); + return Err(status); + } + }; + let drain = self.drain.clone(); - let rx = self - .discover - .watch_inbound_server(target) - .await - .map_err(|e| tonic::Status::internal(format!("lookup failed: {}", e)))? - .ok_or_else(|| tonic::Status::not_found("unknown server"))?; - Ok(tonic::Response::new(response_stream( - drain, - rx, - self.cluster_networks.clone(), - ))) + match self.discover.watch_inbound_server(target).await { + Ok(Some(rx)) => { + let stream = response_stream(drain, rx, self.cluster_networks.clone(), metrics); + Ok(tonic::Response::new(stream)) + } + Ok(None) => { + let status = tonic::Status::not_found("unknown server"); + metrics.end(status.code()); + Err(status) + } + Err(error) => { + let status = tonic::Status::internal(format!("lookup failed: {error}")); + metrics.end(status.code()); + Err(status) + } + } } } @@ -113,6 +155,7 @@ fn response_stream( drain: drain::Watch, mut rx: InboundServerStream, cluster_networks: Arc<[IpNet]>, + metrics: metrics::ResponseObserver, ) -> BoxWatchStream { Box::pin(async_stream::try_stream! { tokio::pin! { @@ -124,18 +167,19 @@ fn response_stream( // When the port is updated with a new server, update the server watch. res = rx.next() => match res { Some(s) => { + metrics.msg_sent(); yield to_server(&s, &cluster_networks); } - None => return, + None => break, }, // If the server starts shutting down, close the stream so that it doesn't hold the // server open. - _ = (&mut shutdown) => { - return; - } + _ = (&mut shutdown) => break, } } + + metrics.end(tonic::Code::Ok); }) } diff --git a/policy-controller/grpc/src/lib.rs b/policy-controller/grpc/src/lib.rs index dd665b079..0f144873d 100644 --- a/policy-controller/grpc/src/lib.rs +++ b/policy-controller/grpc/src/lib.rs @@ -4,5 +4,6 @@ mod routes; pub mod inbound; +pub mod metrics; pub mod outbound; pub mod workload; diff --git a/policy-controller/grpc/src/metrics.rs b/policy-controller/grpc/src/metrics.rs new file mode 100644 index 000000000..406eecb56 --- /dev/null +++ b/policy-controller/grpc/src/metrics.rs @@ -0,0 +1,247 @@ +use prometheus_client::encoding::EncodeLabelSet; +use prometheus_client::metrics::histogram::Histogram; +use prometheus_client::{ + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; +use tokio::time; + +#[derive(Clone, Debug)] +pub struct GrpcServerMetricsFamily { + started: Family, + handling: Family, + handled: Family, + msg_received: Family, + msg_sent: Family, +} + +#[derive(Clone, Debug)] +pub(crate) struct GrpcServerRPCMetrics { + started: Counter, + msg_received: Counter, + msg_sent: Counter, + handling: Histogram, + handled: Family, + labels: Labels, +} + +pub(crate) struct ResponseObserver { + msg_sent: Counter, + handled: Option, +} + +struct ResponseHandle { + start: time::Instant, + durations: Histogram, + codes: Family, + labels: Labels, +} + +#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)] +struct Labels { + grpc_service: &'static str, + grpc_method: &'static str, + grpc_type: &'static str, +} + +#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)] +struct CodeLabels { + grpc_service: &'static str, + grpc_method: &'static str, + grpc_type: &'static str, + grpc_code: &'static str, +} + +// === GrpcServerMetricsFamily === + +impl GrpcServerMetricsFamily { + pub fn register(reg: &mut Registry) -> Self { + let started = Family::::default(); + reg.register( + "started", + "Total number of RPCs started on the server", + started.clone(), + ); + + let msg_received = Family::::default(); + reg.register( + "msg_received", + "Total number of RPC stream messages received on the server", + msg_received.clone(), + ); + + let msg_sent = Family::::default(); + reg.register( + "msg_sent", + "Total number of gRPC stream messages sent by the server", + msg_sent.clone(), + ); + + let handled = Family::::default(); + reg.register( + "handled", + "Total number of RPCs completed on the server, regardless of success or failure", + handled.clone(), + ); + + let handling = Family::::new_with_constructor(|| { + // Our default client configuration has a 5m idle timeout and a 1h + // max lifetime. + Histogram::new([0.1, 1.0, 300.0, 3600.0]) + }); + reg.register_with_unit( + "handling", + "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server", + prometheus_client::registry::Unit::Seconds, + handling.clone(), + ); + + Self { + started, + msg_received, + msg_sent, + handled, + handling, + } + } + + pub(crate) fn unary_rpc( + &self, + svc: &'static str, + method: &'static str, + ) -> GrpcServerRPCMetrics { + self.rpc(svc, method, "unary") + } + + pub(crate) fn server_stream_rpc( + &self, + svc: &'static str, + method: &'static str, + ) -> GrpcServerRPCMetrics { + self.rpc(svc, method, "server_stream") + } + + fn rpc( + &self, + grpc_service: &'static str, + grpc_method: &'static str, + grpc_type: &'static str, + ) -> GrpcServerRPCMetrics { + let labels = Labels { + grpc_service, + grpc_method, + grpc_type, + }; + GrpcServerRPCMetrics { + started: self.started.get_or_create(&labels).clone(), + msg_received: self.msg_received.get_or_create(&labels).clone(), + msg_sent: self.msg_sent.get_or_create(&labels).clone(), + handled: self.handled.clone(), + handling: self.handling.get_or_create(&labels).clone(), + labels, + } + } +} + +// === GrpcServerRPCMetrics === + +impl GrpcServerRPCMetrics { + pub(crate) fn start(&self) -> ResponseObserver { + self.started.inc(); + + // All of our interfaces are unary or server-streaming, so we can + // assume that if we receive a request, we received a single message. + self.msg_received.inc(); + + let handled = { + // Pre-register OK + let _ = self.handled.get_or_create(&CodeLabels { + grpc_service: self.labels.grpc_service, + grpc_method: self.labels.grpc_method, + grpc_type: self.labels.grpc_type, + grpc_code: code_str(tonic::Code::Ok), + }); + + Some(ResponseHandle { + start: time::Instant::now(), + durations: self.handling.clone(), + codes: self.handled.clone(), + labels: self.labels.clone(), + }) + }; + + ResponseObserver { + msg_sent: self.msg_sent.clone(), + handled, + } + } +} + +// === ResponseObserver === + +impl ResponseObserver { + pub(crate) fn msg_sent(&self) { + self.msg_sent.inc(); + } + + pub(crate) fn end(mut self, code: tonic::Code) { + self.handled + .take() + .expect("handle must be set") + .inc_end(code); + } +} + +impl Drop for ResponseObserver { + fn drop(&mut self) { + if let Some(inner) = self.handled.take() { + inner.inc_end(tonic::Code::Ok); + } + } +} + +// === ResponseHandle === + +impl ResponseHandle { + #[inline] + fn inc_end(self, code: tonic::Code) { + let Self { + start, + durations, + codes, + labels, + } = self; + durations.observe(start.elapsed().as_secs_f64()); + codes + .get_or_create(&CodeLabels { + grpc_service: labels.grpc_service, + grpc_method: labels.grpc_method, + grpc_type: labels.grpc_type, + grpc_code: code_str(code), + }) + .inc(); + } +} + +fn code_str(code: tonic::Code) -> &'static str { + use tonic::Code::*; + match code { + Ok => "OK", + Cancelled => "CANCELLED", + Unknown => "UNKNOWN", + InvalidArgument => "INVALID_ARGUMENT", + DeadlineExceeded => "DEADLINE_EXCEEDED", + NotFound => "NOT_FOUND", + AlreadyExists => "ALREADY_EXISTS", + PermissionDenied => "PERMISSION_DENIED", + ResourceExhausted => "RESOURCE_EXHAUSTED", + FailedPrecondition => "FAILED_PRECONDITION", + Aborted => "ABORTED", + OutOfRange => "OUT_OF_RANGE", + Unimplemented => "UNIMPLEMENTED", + Internal => "INTERNAL", + Unavailable => "UNAVAILABLE", + DataLoss => "DATA_LOSS", + Unauthenticated => "UNAUTHENTICATED", + } +} diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index 8b375d2b8..53d004b44 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -1,5 +1,6 @@ extern crate http as http_crate; +use crate::metrics::{self, GrpcServerMetricsFamily, GrpcServerRPCMetrics}; use crate::workload; use futures::{prelude::*, StreamExt}; use http_crate::uri::Authority; @@ -33,6 +34,8 @@ pub struct OutboundPolicyServer { cluster_domain: Arc, allow_l5d_request_headers: bool, drain: drain::Watch, + get_metrics: GrpcServerRPCMetrics, + watch_metrics: GrpcServerRPCMetrics, } impl OutboundPolicyServer @@ -44,12 +47,19 @@ where cluster_domain: impl Into>, allow_l5d_request_headers: bool, drain: drain::Watch, + metrics: GrpcServerMetricsFamily, ) -> Self { + const SERVICE: &str = "io.linkerd.proxy.outbound.OutboundPolicies"; + let get_metrics = metrics.unary_rpc(SERVICE, "Get"); + let watch_metrics = metrics.server_stream_rpc(SERVICE, "Watch"); + Self { index: discover, cluster_domain: cluster_domain.into(), allow_l5d_request_headers, drain, + get_metrics, + watch_metrics, } } @@ -149,27 +159,36 @@ where &self, req: tonic::Request, ) -> Result, tonic::Status> { - let target = self.lookup(req.into_inner())?; + let metrics = self.get_metrics.start(); + let target = match self.lookup(req.into_inner()) { + Ok(target) => target, + Err(status) => { + metrics.end(status.code()); + return Err(status); + } + }; - match target.clone() { + match target { OutboundDiscoverTarget::Resource(resource) => { let original_dst = resource.original_dst(); - let policy = self - .index - .get_outbound_policy(resource) - .await - .map_err(|error| { - tonic::Status::internal(format!("failed to get outbound policy: {error}")) - })?; - - if let Some(policy) = policy { - Ok(tonic::Response::new(to_proto( + match self.index.get_outbound_policy(resource).await { + Ok(Some(policy)) => Ok(tonic::Response::new(to_proto( policy, self.allow_l5d_request_headers, original_dst, - ))) - } else { - Err(tonic::Status::not_found("No such policy")) + ))), + Ok(None) => { + let status = tonic::Status::not_found("unknown target"); + metrics.end(status.code()); + return Err(status); + } + Err(error) => { + let status = tonic::Status::internal(format!( + "failed to get outbound policy: {error}" + )); + metrics.end(status.code()); + return Err(status); + } } } @@ -185,23 +204,41 @@ where &self, req: tonic::Request, ) -> Result, tonic::Status> { - let target = self.lookup(req.into_inner())?; - let drain = self.drain.clone(); + let metrics = self.watch_metrics.start(); + let target = match self.lookup(req.into_inner()) { + Ok(target) => target, + Err(status) => { + metrics.end(status.code()); + return Err(status); + } + }; - match target.clone() { + let drain = self.drain.clone(); + match target { OutboundDiscoverTarget::Resource(resource) => { let original_dst = resource.original_dst(); - let rx = self - .index - .watch_outbound_policy(resource) - .await - .map_err(|e| tonic::Status::internal(format!("lookup failed: {e}")))? - .ok_or_else(|| tonic::Status::not_found("unknown server"))?; + let rx = match self.index.watch_outbound_policy(resource).await { + Ok(Some(rx)) => rx, + Ok(None) => { + let status = tonic::Status::not_found("unknown target"); + metrics.end(status.code()); + return Err(status); + } + Err(error) => { + let status = tonic::Status::internal(format!( + "failed to get outbound policy: {error}" + )); + metrics.end(status.code()); + return Err(status); + } + }; + Ok(tonic::Response::new(response_stream( drain, rx, self.allow_l5d_request_headers, original_dst, + metrics, ))) } @@ -211,6 +248,7 @@ where drain, rx, original_dst, + metrics, ))) } } @@ -226,6 +264,7 @@ fn response_stream( mut rx: OutboundPolicyStream, allow_l5d_request_headers: bool, original_dst: Option, + metrics: metrics::ResponseObserver, ) -> BoxWatchStream { Box::pin(async_stream::try_stream! { tokio::pin! { @@ -237,18 +276,19 @@ fn response_stream( // When the port is updated with a new server, update the server watch. res = rx.next() => match res { Some(policy) => { + metrics.msg_sent(); yield to_proto(policy, allow_l5d_request_headers, original_dst); } - None => return, + None => break, }, // If the server starts shutting down, close the stream so that it doesn't hold the // server open. - _ = &mut shutdown => { - return; - } + _ = &mut shutdown => break, } } + + metrics.end(tonic::Code::Ok); }) } @@ -256,6 +296,7 @@ fn external_stream( drain: drain::Watch, mut rx: ExternalPolicyStream, original_dst: SocketAddr, + metrics: metrics::ResponseObserver, ) -> BoxWatchStream { Box::pin(async_stream::try_stream! { tokio::pin! { @@ -266,18 +307,19 @@ fn external_stream( tokio::select! { res = rx.next() => match res { Some(_) => { + metrics.msg_sent(); yield fallback(original_dst); } - None => return, + None => break, }, // If the server starts shutting down, close the stream so that it doesn't hold the // server open. - _ = &mut shutdown => { - return; - } + _ = &mut shutdown => break, } } + + metrics.end(tonic::Code::Ok); }) } diff --git a/policy-controller/runtime/src/args.rs b/policy-controller/runtime/src/args.rs index b55da0896..6785eecb3 100644 --- a/policy-controller/runtime/src/args.rs +++ b/policy-controller/runtime/src/args.rs @@ -1,7 +1,7 @@ use crate::{ admission::Admission, core::IpNet, - grpc, + grpc::{self, metrics::GrpcServerMetricsFamily}, index::{self, ports::parse_portset, ClusterInfo, DefaultPolicy}, index_list::IndexList, k8s::{self, gateway, Client, Resource}, @@ -163,6 +163,9 @@ impl Args { inbound_index.clone(), ); let rt_metrics = kubert::RuntimeMetrics::register(prom.sub_registry_with_prefix("kube")); + let grpc_metrics = grpc::metrics::GrpcServerMetricsFamily::register( + prom.sub_registry_with_prefix("grpc_server"), + ); let mut runtime = kubert::Runtime::builder() .with_log(log_level, log_format) @@ -369,6 +372,7 @@ impl Args { allow_l5d_request_headers, inbound_index, outbound_index, + grpc_metrics.clone(), runtime.shutdown_handle(), )); @@ -413,6 +417,7 @@ impl std::str::FromStr for IpNets { } #[instrument(skip_all, fields(port = %addr.port()))] +#[allow(clippy::too_many_arguments)] async fn grpc( addr: SocketAddr, cluster_domain: String, @@ -420,12 +425,17 @@ async fn grpc( allow_l5d_request_headers: bool, inbound_index: index::inbound::SharedIndex, outbound_index: index::outbound::SharedIndex, + metrics: GrpcServerMetricsFamily, drain: drain::Watch, ) -> Result<()> { let inbound_discover = InboundDiscover::new(inbound_index); - let inbound_svc = - grpc::inbound::InboundPolicyServer::new(inbound_discover, cluster_networks, drain.clone()) - .svc(); + let inbound_svc = grpc::inbound::InboundPolicyServer::new( + inbound_discover, + cluster_networks, + drain.clone(), + metrics.clone(), + ) + .svc(); let outbound_discover = OutboundDiscover::new(outbound_index); let outbound_svc = grpc::outbound::OutboundPolicyServer::new( @@ -433,6 +443,7 @@ async fn grpc( cluster_domain, allow_l5d_request_headers, drain.clone(), + metrics, ) .svc();