Merge branch 'ver/route-reqs-mtx'

This commit is contained in:
Oliver Gould 2023-04-04 21:37:37 +00:00
commit fdd2128ff6
No known key found for this signature in database
11 changed files with 285 additions and 30 deletions

View File

@ -1032,6 +1032,7 @@ dependencies = [
name = "linkerd-app-outbound"
version = "0.1.0"
dependencies = [
"ahash",
"bytes",
"futures",
"http",

View File

@ -16,6 +16,7 @@ test-subscriber = []
test-util = ["linkerd-app-test", "linkerd-meshtls-rustls/test-util"]
[dependencies]
ahash = "0.8"
bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }

View File

@ -162,7 +162,8 @@ where
S::Future: Send,
{
svc::layer::mk(move |concrete: N| {
let policy = svc::stack(concrete.clone()).push(policy::Policy::layer());
let policy = svc::stack(concrete.clone())
.push(policy::Policy::layer(metrics.http_route_backends.clone()));
let profile =
svc::stack(concrete.clone()).push(profile::Params::layer(metrics.proxy.clone()));
svc::stack(concrete)

View File

@ -8,7 +8,7 @@ mod router;
mod tests;
pub use self::{
route::errors,
route::{backend::RouteBackendMetrics, errors},
router::{GrpcParams, HttpParams},
};
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
@ -45,11 +45,17 @@ where
// Parent target type.
T: Debug + Eq + Hash,
T: Clone + Send + Sync + 'static,
route::backend::ExtractMetrics:
svc::ExtractParam<route::backend::RequestCount, route::backend::Http<T>>,
// route::backend::ExtractMetrics:
// svc::ExtractParam<route::backend::RequestCount, route::backend::Grpc<T>>,
{
/// Builds a stack that dynamically updates and applies HTTP or gRPC policy
/// routing configurations to route requests over cached inner backend
/// services.
pub(super) fn layer<N, S>() -> impl svc::Layer<
pub(super) fn layer<N, S>(
route_backend_metrics: RouteBackendMetrics,
) -> impl svc::Layer<
N,
Service = svc::ArcNewService<
Self,
@ -74,8 +80,9 @@ where
S::Future: Send,
{
svc::layer::mk(move |inner: N| {
let http = svc::stack(inner.clone()).push(router::Http::layer());
let grpc = svc::stack(inner).push(router::Grpc::layer());
let http =
svc::stack(inner.clone()).push(router::Http::layer(route_backend_metrics.clone()));
let grpc = svc::stack(inner).push(router::Grpc::layer(route_backend_metrics.clone()));
http.push_switch(
|pp: Policy<T>| {

View File

@ -1,4 +1,6 @@
use super::super::Concrete;
use crate::RouteRef;
use super::{super::Concrete, RouteBackendMetrics};
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
use linkerd_distribute as distribute;
use linkerd_http_route as http_route;
@ -25,7 +27,7 @@ pub(crate) struct Matched<M, P> {
pub(crate) struct Route<T, F, E> {
pub(super) parent: T,
pub(super) addr: Addr,
pub(super) meta: Arc<policy::Meta>,
pub(super) route_ref: RouteRef,
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
pub(super) failure_policy: E,
@ -66,11 +68,14 @@ where
Self: filters::Apply,
Self: svc::Param<classify::Request>,
MatchedBackend<T, M, F>: filters::Apply,
backend::ExtractMetrics: svc::ExtractParam<backend::RequestCount, MatchedBackend<T, M, F>>,
{
/// Builds a route stack that applies policy filters to requests and
/// distributes requests over each route's backends. These [`Concrete`]
/// backends are expected to be cached/shared by the inner stack.
pub(crate) fn layer<N, S>() -> impl svc::Layer<
pub(crate) fn layer<N, S>(
backend_metrics: RouteBackendMetrics,
) -> impl svc::Layer<
N,
Service = svc::ArcNewService<
Self,
@ -98,7 +103,7 @@ where
svc::stack(inner)
// Distribute requests across route backends, applying policies
// and filters for each of the route-backends.
.push(MatchedBackend::layer())
.push(MatchedBackend::layer(backend_metrics.clone()))
.lift_new_with_target()
.push(NewDistribute::layer())
// The router does not take the backend's availability into

View File

@ -1,11 +1,19 @@
use crate::RouteRef;
use super::{super::Concrete, filters};
use linkerd_app_core::{proxy::http, svc, Error, Result};
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};
mod count_reqs;
mod metrics;
pub use self::{count_reqs::RequestCount, metrics::RouteBackendMetrics};
#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct Backend<T, F> {
pub(crate) route_ref: RouteRef,
pub(crate) concrete: Concrete<T>,
pub(crate) filters: Arc<[F]>,
}
@ -16,11 +24,17 @@ pub(crate) type Http<T> =
pub(crate) type Grpc<T> =
MatchedBackend<T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter>;
#[derive(Clone, Debug)]
pub struct ExtractMetrics {
metrics: RouteBackendMetrics,
}
// === impl Backend ===
impl<T: Clone, F> Clone for Backend<T, F> {
fn clone(&self) -> Self {
Self {
route_ref: self.route_ref.clone(),
filters: self.filters.clone(),
concrete: self.concrete.clone(),
}
@ -51,13 +65,16 @@ where
F: Clone + Send + Sync + 'static,
// Assert that filters can be applied.
Self: filters::Apply,
ExtractMetrics: svc::ExtractParam<RequestCount, Self>,
{
/// Builds a stack that applies per-route-backend policy filters over an
/// inner [`Concrete`] stack.
///
/// This [`MatchedBackend`] must implement [`filters::Apply`] to apply these
/// filters.
pub(crate) fn layer<N, S>() -> impl svc::Layer<
pub(crate) fn layer<N, S>(
metrics: RouteBackendMetrics,
) -> impl svc::Layer<
N,
Service = svc::ArcNewService<
Self,
@ -90,6 +107,9 @@ where
}| concrete,
)
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(count_reqs::NewCountRequests::layer_via(ExtractMetrics {
metrics: metrics.clone(),
}))
.push(svc::ArcNewService::layer())
.into_inner()
})
@ -109,3 +129,23 @@ impl<T> filters::Apply for Grpc<T> {
filters::apply_grpc(&self.r#match, &self.params.filters, req)
}
}
impl<T> svc::ExtractParam<RequestCount, Http<T>> for ExtractMetrics {
fn extract_param(&self, params: &Http<T>) -> RequestCount {
RequestCount(self.metrics.http_requests_total(
params.params.concrete.parent_ref.clone(),
params.params.route_ref.clone(),
params.params.concrete.backend_ref.clone(),
))
}
}
impl<T> svc::ExtractParam<RequestCount, Grpc<T>> for ExtractMetrics {
fn extract_param(&self, params: &Grpc<T>) -> RequestCount {
RequestCount(self.metrics.grpc_requests_total(
params.params.concrete.parent_ref.clone(),
params.params.route_ref.clone(),
params.params.concrete.backend_ref.clone(),
))
}
}

View File

@ -0,0 +1,73 @@
use linkerd_app_core::{metrics::Counter, svc};
use std::{
sync::Arc,
task::{Context, Poll},
};
#[derive(Clone, Debug, Default)]
pub struct RequestCount(pub Arc<Counter>);
#[derive(Clone, Debug)]
pub struct NewCountRequests<X, N> {
inner: N,
extract: X,
}
#[derive(Clone, Debug)]
pub struct CountRequests<S> {
inner: S,
requests: Arc<Counter>,
}
// === impl NewCountRequests ===
impl<X: Clone, N> NewCountRequests<X, N> {
pub fn new(extract: X, inner: N) -> Self {
Self { extract, inner }
}
pub fn layer_via(extract: X) -> impl svc::Layer<N, Service = Self> + Clone {
svc::layer::mk(move |inner| Self::new(extract.clone(), inner))
}
}
impl<T, X, N> svc::NewService<T> for NewCountRequests<X, N>
where
X: svc::ExtractParam<RequestCount, T>,
N: svc::NewService<T>,
{
type Service = CountRequests<N::Service>;
fn new_service(&self, target: T) -> Self::Service {
let RequestCount(counter) = self.extract.extract_param(&target);
let inner = self.inner.new_service(target);
CountRequests::new(counter, inner)
}
}
// === impl CountRequests ===
impl<S> CountRequests<S> {
fn new(requests: Arc<Counter>, inner: S) -> Self {
Self { requests, inner }
}
}
impl<B, S> svc::Service<http::Request<B>> for CountRequests<S>
where
S: svc::Service<http::Request<B>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
self.requests.incr();
self.inner.call(req)
}
}

View File

@ -0,0 +1,94 @@
use ahash::AHashMap;
use linkerd_app_core::metrics::{metrics, Counter, FmtLabels, FmtMetrics};
use linkerd_proxy_client_policy as policy;
use parking_lot::Mutex;
use std::{fmt::Write, sync::Arc};
use crate::{BackendRef, ParentRef, RouteRef};
metrics! {
outbound_http_route_backend_requests_total: Counter {
"The total number of outbound requests dispatched to a HTTP route backend"
},
outbound_grpc_route_backend_requests_total: Counter {
"The total number of outbound requests dispatched to a gRPC route backend"
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct Labels(ParentRef, RouteRef, BackendRef);
#[derive(Clone, Debug, Default)]
pub struct RouteBackendMetrics {
http: Arc<Mutex<AHashMap<Labels, Arc<Counter>>>>,
grpc: Arc<Mutex<AHashMap<Labels, Arc<Counter>>>>,
}
// === impl RouteBackendMetrics ===
impl RouteBackendMetrics {
pub fn http_requests_total(&self, pr: ParentRef, rr: RouteRef, br: BackendRef) -> Arc<Counter> {
self.http
.lock()
.entry(Labels(pr, rr, br))
.or_default()
.clone()
}
pub fn grpc_requests_total(&self, pr: ParentRef, rr: RouteRef, br: BackendRef) -> Arc<Counter> {
self.grpc
.lock()
.entry(Labels(pr, rr, br))
.or_default()
.clone()
}
}
impl FmtMetrics for RouteBackendMetrics {
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let http = self.http.lock();
if !http.is_empty() {
outbound_http_route_backend_requests_total.fmt_help(f)?;
outbound_http_route_backend_requests_total.fmt_scopes(f, http.iter(), |c| c)?;
}
drop(http);
let grpc = self.grpc.lock();
if !grpc.is_empty() {
outbound_grpc_route_backend_requests_total.fmt_help(f)?;
outbound_grpc_route_backend_requests_total.fmt_scopes(f, grpc.iter(), |c| c)?;
}
drop(grpc);
Ok(())
}
}
impl FmtLabels for Labels {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self(parent, route, backend) = self;
Self::write_meta("parent", parent, f)?;
f.write_char(',')?;
Self::write_meta("route", route, f)?;
f.write_char(',')?;
Self::write_meta("backend", backend, f)?;
Ok(())
}
}
impl Labels {
fn write_meta(
scope: &str,
meta: &policy::Meta,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
write!(f, "{scope}_group=\"{}\"", meta.group())?;
write!(f, ",{scope}_kind=\"{}\"", meta.kind())?;
write!(f, ",{scope}_namespace=\"{}\"", meta.namespace())?;
write!(f, ",{scope}_name=\"{}\"", meta.name())?;
Ok(())
}
}

View File

@ -1,8 +1,8 @@
use super::{
super::{concrete, Concrete, LogicalAddr, NoRoute},
route::{self},
route, RouteBackendMetrics,
};
use crate::{BackendRef, EndpointRef, ParentRef};
use crate::{BackendRef, EndpointRef, ParentRef, RouteRef};
use linkerd_app_core::{
classify, proxy::http, svc, transport::addrs::*, Addr, Error, NameAddr, Result,
};
@ -64,10 +64,14 @@ where
>,
route::MatchedRoute<T, M::Summary, F, E>: route::filters::Apply + svc::Param<classify::Request>,
route::MatchedBackend<T, M::Summary, F>: route::filters::Apply,
route::backend::ExtractMetrics:
svc::ExtractParam<route::backend::RequestCount, route::MatchedBackend<T, M::Summary, F>>,
{
/// Builds a stack that applies routes to distribute requests over a cached
/// set of inner services so that.
pub(super) fn layer<N, S>() -> impl svc::Layer<
pub(super) fn layer<N, S>(
route_backend_metrics: RouteBackendMetrics,
) -> impl svc::Layer<
N,
Service = svc::ArcNewService<
Self,
@ -99,7 +103,7 @@ where
.push(NewBackendCache::layer())
// Lazily cache a service for each `RouteParams` returned from the
// `SelectRoute` impl.
.push_on_service(route::MatchedRoute::layer())
.push_on_service(route::MatchedRoute::layer(route_backend_metrics.clone()))
.push(svc::NewOneshotRoute::<Self, (), _>::layer_cached())
.push(svc::ArcNewService::layer())
.into_inner()
@ -168,27 +172,28 @@ where
),
};
let mk_route_backend = {
let mk_dispatch = mk_dispatch.clone();
move |rb: &policy::RouteBackend<F>| {
let filters = rb.filters.clone();
let concrete = mk_dispatch(&rb.backend);
route::Backend { filters, concrete }
let mk_route_backend = |route_ref: &RouteRef, rb: &policy::RouteBackend<F>| {
let filters = rb.filters.clone();
let concrete = mk_dispatch(&rb.backend);
route::Backend {
route_ref: route_ref.clone(),
filters,
concrete,
}
};
let mk_distribution = |d: &policy::RouteDistribution<F>| match d {
let mk_distribution = |route_ref: &RouteRef, d: &policy::RouteDistribution<F>| match d {
policy::RouteDistribution::Empty => route::BackendDistribution::Empty,
policy::RouteDistribution::FirstAvailable(backends) => {
route::BackendDistribution::first_available(
backends.iter().map(|b| mk_route_backend(b)),
backends.iter().map(|b| mk_route_backend(route_ref, b)),
)
}
policy::RouteDistribution::RandomAvailable(backends) => {
route::BackendDistribution::random_available(
backends
.iter()
.map(|(rb, weight)| (mk_route_backend(rb), *weight)),
.map(|(rb, weight)| (mk_route_backend(route_ref, rb), *weight)),
)
.expect("distribution must be valid")
}
@ -200,11 +205,12 @@ where
distribution,
failure_policy,
}| {
let distribution = mk_distribution(&distribution);
let route_ref = RouteRef(meta);
let distribution = mk_distribution(&route_ref, &distribution);
route::Route {
route_ref,
addr: addr.clone(),
parent: parent.clone(),
meta,
filters,
failure_policy,
distribution,
@ -248,7 +254,13 @@ where
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Self::Error> {
tracing::trace!(uri = ?req.uri(), headers = ?req.headers(), "Selecting HTTP route");
let (r#match, params) = policy::http::find(&*self.routes, req).ok_or(NoRoute)?;
tracing::debug!(meta = ?params.meta, "Selected route");
tracing::debug!(
group = %params.route_ref.group(),
kind = %params.route_ref.kind(),
ns = %params.route_ref.namespace(),
name = %params.route_ref.name(),
"Selected route",
);
tracing::trace!(?r#match);
Ok(route::Matched {
r#match,
@ -267,7 +279,7 @@ where
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Self::Error> {
tracing::trace!(uri = ?req.uri(), headers = ?req.headers(), "Selecting gRPC route");
let (r#match, params) = policy::grpc::find(&*self.routes, req).ok_or(NoRoute)?;
tracing::debug!(meta = ?params.meta, "Selected route");
tracing::debug!(meta = ?params.route_ref, "Selected route");
tracing::trace!(?r#match);
Ok(route::Matched {
r#match,

View File

@ -81,7 +81,8 @@ async fn header_based_route() {
}
});
let router = Policy::layer()
let metrics = RouteBackendMetrics::default();
let router = Policy::layer(metrics.clone())
.layer(inner)
.new_service(Policy::from((routes, ())));
@ -114,6 +115,20 @@ async fn header_based_route() {
// Hold the router to prevent inner services from being dropped.
drop(router);
let report = linkerd_app_core::metrics::FmtMetrics::as_display(&metrics).to_string();
let mut lines = report
.lines()
.filter(|l| !l.starts_with('#'))
.collect::<Vec<_>>();
lines.sort();
assert_eq!(
lines,
vec![
r#"outbound_http_route_backend_requests_total{parent_group="",parent_kind="default",parent_namespace="",parent_name="parent",route_group="",route_kind="default",route_namespace="",route_name="default",backend_group="",backend_kind="default",backend_namespace="",backend_name="default"} 1"#,
r#"outbound_http_route_backend_requests_total{parent_group="",parent_kind="default",parent_namespace="",parent_name="parent",route_group="",route_kind="default",route_namespace="",route_name="special",backend_group="",backend_kind="default",backend_namespace="",backend_name="special"} 1"#,
]
);
}
#[tokio::test(flavor = "current_thread")]
@ -175,7 +190,7 @@ async fn http_filter_request_headers() {
}
});
let router = Policy::layer()
let router = Policy::layer(Default::default())
.layer(inner)
.new_service(Policy::from((routes, ())));

View File

@ -8,6 +8,8 @@
//! to be updated frequently or in a performance-critical area. We should probably look to use
//! `DashMap` as we migrate other metrics registries.
use crate::http::policy::RouteBackendMetrics;
pub(crate) mod error;
pub use linkerd_app_core::metrics::*;
@ -18,6 +20,8 @@ pub struct OutboundMetrics {
pub(crate) http_errors: error::Http,
pub(crate) tcp_errors: error::Tcp,
pub(crate) http_route_backends: RouteBackendMetrics,
/// Holds metrics that are common to both inbound and outbound proxies. These metrics are
/// reported separately
pub(crate) proxy: Proxy,
@ -26,15 +30,17 @@ pub struct OutboundMetrics {
impl OutboundMetrics {
pub(crate) fn new(proxy: Proxy) -> Self {
Self {
proxy,
http_errors: error::Http::default(),
tcp_errors: error::Tcp::default(),
proxy,
http_route_backends: RouteBackendMetrics::default(),
}
}
}
impl FmtMetrics for OutboundMetrics {
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.http_route_backends.fmt_metrics(f)?;
self.http_errors.fmt_metrics(f)?;
self.tcp_errors.fmt_metrics(f)?;