Compare commits

...

4 Commits

Author SHA1 Message Date
Alex Leong 7d8a46b56e
v2.210.3 (#2522)
* Include server address in server error logs (#2500)
* inbound: Fix gRPC response classification (#2496)
* Bump ahash to v0.8.5 (#2498)
* Allow BSD-2-Clause

---------

Co-authored-by: Oliver Gould <ver@buoyant.io>
2023-11-16 12:36:40 -08:00
Oliver Gould d478c2e253 Log a warning when the controller clients receive an error (#2499)
The controller client includes a recovery/backoff module that causes
resolutions to be retried when an unexpected error is encountered.
These events are only logged at debugging and trace log levels.

This change updates the destination and policy controller recovery
modules to log unexpected errors as warnings.
2023-11-06 14:27:25 -08:00
Alex Leong dd47e38a2c Render grpc_status metric label as number (#2480)
Fixes https://github.com/linkerd/linkerd2/issues/11449

The `grpc_status` metric label is rendered as a long form, human readable string value in the proxy metrics.  For example:

```
response_total{direction="outbound", [...], classification="failure",grpc_status="Unknown error",error=""} 1
```

This is because of the Display impl for Code.  We explicitly convert to an i32 so this renders as a number instead:

```
response_total{direction="outbound", [...] ,classification="failure",grpc_status="2",error=""} 1
```

Signed-off-by: Alex Leong <alex@buoyant.io>
2023-10-26 11:11:05 -07:00
Oliver Gould 8b650a9e2f balance: Log and fail stuck discovery streams. (#2484)
In 6d2abbc, we changed how outbound proxies process discovery updates.
The prior implementation used a watchdog timeout to bound the amount of
time an update stream could be full. With that change, when an update
channel fills, the backpressure can extend to the destination
controller's gRPC response stream.

To detect and avoid this harmful (and useless) backpressure, this change
modifies the balancer's discovery processing stream to exit when the
balancer has 1000 unprocessed discovery updates. A sufficiently scary
warning is logged.
2023-10-26 11:11:05 -07:00
28 changed files with 394 additions and 74 deletions

View File

@ -16,14 +16,15 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]]
name = "ahash"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
@ -1010,6 +1011,7 @@ dependencies = [
"linkerd-app-core",
"linkerd-app-test",
"linkerd-http-access-log",
"linkerd-http-metrics",
"linkerd-idle-cache",
"linkerd-io",
"linkerd-meshtls",
@ -3341,3 +3343,23 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [
"winapi",
]
[[package]]
name = "zerocopy"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a7af71d8643341260a65f89fa60c0eeaa907f34544d8f6d9b0df72f069b5e74"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9731702e2f0617ad526794ae28fbc6f6ca8849b5ba729666c2a5bc4b6ddee2cd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.12",
]

View File

@ -17,6 +17,7 @@ ignore = []
unlicensed = "deny"
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"ISC",
"MIT",
@ -27,13 +28,21 @@ allow-osi-fsf-free = "neither"
default = "deny"
confidence-threshold = 0.8
exceptions = [
{ allow = ["Zlib"], name = "adler32", version = "*" },
{ allow = ["ISC", "MIT", "OpenSSL"], name = "ring", version = "*" },
{ allow = [
"Zlib",
], name = "adler32", version = "*" },
{ allow = [
"ISC",
"MIT",
"OpenSSL",
], name = "ring", version = "*" },
# The Unicode-DFS-2016 license is necessary for unicode-ident because they
# use data from the unicode tables to generate the tables which are
# included in the application. We do not distribute those data files so
# this is not a problem for us. See https://github.com/dtolnay/unicode-ident/pull/9/files
{ allow = ["Unicode-DFS-2016"], name = "unicode-ident", version = "*"},
{ allow = [
"Unicode-DFS-2016",
], name = "unicode-ident", version = "*" },
]
[[licenses.clarify]]

View File

@ -7,7 +7,9 @@ use linkerd_app_core::{
serve,
svc::{self, ExtractParam, InsertParam, Param},
tls, trace,
transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
transport::{
self, addrs::AddrPair, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr,
},
Error, Result,
};
use linkerd_app_inbound as inbound;
@ -84,7 +86,9 @@ impl Config {
where
R: FmtMetrics + Clone + Send + Sync + Unpin + 'static,
B: Bind<ServerConfig>,
B::Addrs: svc::Param<Remote<ClientAddr>> + svc::Param<Local<ServerAddr>>,
B::Addrs: svc::Param<Remote<ClientAddr>>,
B::Addrs: svc::Param<Local<ServerAddr>>,
B::Addrs: svc::Param<AddrPair>,
{
let (listen_addr, listen) = bind.bind(&self.server)?;
@ -95,6 +99,7 @@ impl Config {
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
let admin = svc::stack(move |_| admin.clone())
.push(metrics.proxy.http_endpoint.to_layer::<classify::Response, _, Permitted>())
.push(classify::NewClassify::layer_default())
.push_map_target(|(permit, http)| Permitted { permit, http })
.push(inbound::policy::NewHttpPolicy::layer(metrics.http_authz.clone()))
.push(Rescue::layer())
@ -201,6 +206,14 @@ impl Param<Remote<ClientAddr>> for Http {
}
}
impl Param<AddrPair> for Http {
fn param(&self) -> AddrPair {
let Remote(client) = self.tcp.client;
let Local(server) = self.tcp.addr;
AddrPair(client, server)
}
}
impl Param<tls::ConditionalServerTls> for Http {
fn param(&self) -> tls::ConditionalServerTls {
self.tcp.tls.clone()

View File

@ -124,6 +124,7 @@ impl Config {
.lift_new()
.push(self::balance::layer(dns, resolve_backoff))
.push(metrics.to_layer::<classify::Response, _, _>())
.push(classify::NewClassify::layer_default())
// This buffer allows a resolver client to be shared across stacks.
// No load shed is applied here, however, so backpressure may leak
// into the caller task.

View File

@ -422,7 +422,7 @@ impl FmtLabels for Class {
"classification=\"{}\",grpc_status=\"{}\",error=\"\"",
class(res.is_ok()),
match res {
Ok(code) | Err(code) => code,
Ok(code) | Err(code) => <i32>::from(*code),
}
),

View File

@ -1,11 +1,11 @@
use crate::{
io, is_caused_by,
svc::{self, Param},
transport::{ClientAddr, Remote},
Result,
};
use futures::prelude::*;
use linkerd_error::Error;
use linkerd_proxy_transport::AddrPair;
use tower::util::ServiceExt;
use tracing::{debug, debug_span, info, instrument::Instrument, warn};
@ -18,7 +18,7 @@ pub async fn serve<M, S, I, A>(
shutdown: impl Future,
) where
I: Send + 'static,
A: Param<Remote<ClientAddr>>,
A: Param<AddrPair>,
M: svc::NewService<A, Service = S>,
S: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
S::Error: Into<Error>,
@ -40,8 +40,8 @@ pub async fn serve<M, S, I, A>(
};
// The local addr should be instrumented from the listener's context.
let Remote(ClientAddr(client_addr)) = addrs.param();
let span = debug_span!("accept", client.addr = %client_addr).entered();
let AddrPair(client_addr, server_addr) = addrs.param();
let span = debug_span!("accept", client.addr = %client_addr, server.addr = %server_addr).entered();
let accept = new_accept.new_service(addrs);
// Dispatch all of the work for a given connection onto a
@ -57,10 +57,20 @@ pub async fn serve<M, S, I, A>(
{
Ok(()) => debug!("Connection closed"),
Err(reason) if is_caused_by::<std::io::Error>(&*reason) => {
debug!(%reason, "Connection closed")
debug!(
reason,
client.addr = %client_addr,
server.addr = %server_addr,
"Connection closed"
);
}
Err(error) => {
info!(error, client.addr = %client_addr, "Connection closed")
info!(
error,
client.addr = %client_addr,
server.addr = %server_addr,
"Connection closed"
);
}
}
// Hold the service until the connection is complete. This

View File

@ -52,6 +52,7 @@ libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] }
[dev-dependencies]
hyper = { version = "0.14", features = ["http1", "http2"] }
linkerd-app-test = { path = "../test" }
linkerd-http-metrics = { path = "../../http-metrics", features = ["test-util"] }
linkerd-idle-cache = { path = "../../idle-cache", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }

View File

@ -148,7 +148,8 @@ impl<C> Inbound<C> {
// Attempts to discover a service profile for each logical target (as
// informed by the request's headers). The stack is cached until a
// request has not been received for `cache_max_idle_age`.
let router = http.clone()
let router = http
.clone()
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_map_target(|p: Profile| p.logical)
.push(profiles::http::NewProxyRouter::layer(
@ -164,6 +165,7 @@ impl<C> Inbound<C> {
.to_layer::<classify::Response, _, _>(),
)
.push_on_service(http::BoxResponse::layer())
// Configure a per-route response classifier based on the profile.
.push(classify::NewClassify::layer())
.push_http_insert_target::<profiles::http::Route>()
.push_map_target(|(route, profile)| ProfileRoute { route, profile })
@ -186,10 +188,7 @@ impl<C> Inbound<C> {
}
Ok(svc::Either::B(logical))
},
http.clone()
.push_on_service(svc::MapErr::layer(Error::from))
.check_new_service::<Logical, http::Request<_>>()
.into_inner(),
http.clone().into_inner(),
)
.check_new_service::<(Option<profiles::Receiver>, Logical), http::Request<_>>();
@ -229,8 +228,7 @@ impl<C> Inbound<C> {
// Skip the profile stack if it takes too long to become ready.
.push_when_unready(config.profile_skip_timeout, http.into_inner())
.push_on_service(
svc::layers()
.push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical")))
rt.metrics.proxy.stack.layer(stack_labels("http", "logical")),
)
.push(svc::NewQueue::layer_via(config.http_request_queue))
.push_new_idle_cached(config.discovery_idle_timeout)
@ -239,6 +237,9 @@ impl<C> Inbound<C> {
.push(http::Retain::layer())
.push(http::BoxResponse::layer()),
)
// Configure default response classification early. It may be
// overridden by profile routes above.
.push(classify::NewClassify::layer_default())
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.instrument(|t: &Logical| {
let name = t.logical.as_ref().map(tracing::field::display);
@ -414,12 +415,6 @@ impl Param<metrics::EndpointLabels> for Logical {
}
}
impl Param<classify::Request> for Logical {
fn param(&self) -> classify::Request {
classify::Request::default()
}
}
impl tap::Inspect for Logical {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
req.extensions().get::<Remote<ClientAddr>>().map(|a| **a)

View File

@ -6,10 +6,11 @@ use crate::{
},
Config, Inbound,
};
use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response};
use hyper::{body::HttpBody, client::conn::Builder as ClientBuilder, Body, Request, Response};
use linkerd_app_core::{
classify,
errors::respond::L5D_PROXY_ERROR,
identity, io,
identity, io, metrics,
proxy::http,
svc::{self, NewService, Param},
tls,
@ -19,6 +20,7 @@ use linkerd_app_core::{
use linkerd_app_test::connect::ConnectFuture;
use linkerd_tracing::test::trace_init;
use std::{net::SocketAddr, sync::Arc};
use tokio::time;
use tracing::Instrument;
fn build_server<I>(
@ -469,6 +471,84 @@ async fn grpc_unmeshed_response_error_header() {
let _ = bg.await;
}
#[tokio::test(flavor = "current_thread")]
async fn grpc_response_class() {
let _trace = trace_init();
// Build a mock connector serves a gRPC server that returns errors.
let connect = {
let mut server = hyper::server::conn::Http::new();
server.http2_only(true);
support::connect().endpoint_fn_boxed(
Target::addr(),
grpc_status_server(server, tonic::Code::Unknown),
)
};
// Build a client using the connect that always errors.
let mut client = ClientBuilder::new();
client.http2_only(true);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let metrics = rt
.metrics
.clone()
.http_endpoint
.into_report(time::Duration::from_secs(3600));
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;
// Send a request and assert that it is OK with the expected header
// message.
let req = Request::builder()
.method(http::Method::POST)
.uri("http://foo.svc.cluster.local:5550")
.header(http::header::CONTENT_TYPE, "application/grpc")
.body(Body::default())
.unwrap();
let mut response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::OK);
response.body_mut().data().await;
let trls = response.body_mut().trailers().await.unwrap().unwrap();
assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2");
let response_total = metrics
.get_response_total(
&metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels {
tls: Target::meshed_h2().1,
authority: Some("foo.svc.cluster.local:5550".parse().unwrap()),
target_addr: "127.0.0.1:80".parse().unwrap(),
policy: metrics::RouteAuthzLabels {
route: metrics::RouteLabels {
server: metrics::ServerLabel(Arc::new(policy::Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
name: "testsrv".into(),
})),
route: policy::Meta::new_default("default"),
},
authz: Arc::new(policy::Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "serverauthorization".into(),
name: "testsaz".into(),
}),
},
}),
Some(http::StatusCode::OK),
&classify::Class::Grpc(Err(tonic::Code::Unknown)),
)
.expect("response_total not found");
assert_eq!(response_total, 1.0);
drop((client, bg));
}
#[tracing::instrument]
fn hello_server(
http: hyper::server::conn::Http,
@ -490,6 +570,42 @@ fn hello_server(
}
}
#[tracing::instrument]
fn grpc_status_server(
http: hyper::server::conn::Http,
status: tonic::Code,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
let span = tracing::info_span!("grpc_status_server", ?endpoint);
let _e = span.enter();
tracing::info!("mock connecting");
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(
http.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert("grpc-status", (status as u32).to_string().parse().unwrap());
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
}
}
#[tracing::instrument]
fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |_| {
@ -523,7 +639,7 @@ fn connect_timeout(
struct Target(http::Version, tls::ConditionalServerTls);
#[track_caller]
fn check_error_header(hdrs: &hyper::header::HeaderMap, expected: &str) {
fn check_error_header(hdrs: &::http::HeaderMap, expected: &str) {
let message = hdrs
.get(L5D_PROXY_ERROR)
.expect("response did not contain l5d-proxy-error header")

View File

@ -109,7 +109,11 @@ impl Recover<tonic::Status> for GrpcRecover {
return Err(status);
}
tracing::trace!(%status, "Recovering");
tracing::warn!(
grpc.status = %status.code(),
grpc.message = status.message(),
"Unexpected policy controller response; retrying with a backoff",
);
Ok(self.0.stream())
}
}

View File

@ -5,7 +5,7 @@ use linkerd_app_core::{
io, profiles,
proxy::http,
serve, svc,
transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
transport::{self, addrs::*},
Error, Result,
};
use std::{fmt::Debug, sync::Arc};
@ -43,7 +43,10 @@ impl Inbound<()> {
profiles: P,
gateway: G,
) where
A: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr> + Clone + Send + Sync + 'static,
A: svc::Param<Remote<ClientAddr>>,
A: svc::Param<OrigDstAddr>,
A: svc::Param<AddrPair>,
A: Clone + Send + Sync + 'static,
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
I: Debug + Unpin + Send + Sync + 'static,
G: svc::NewService<direct::GatewayTransportHeader, Service = GSvc>,

View File

@ -27,6 +27,8 @@ async fn http11_forward() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();
let svc = stack.new_service(Endpoint {
@ -61,6 +63,8 @@ async fn http2_forward() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();
let svc = stack.new_service(Endpoint {
@ -97,6 +101,8 @@ async fn orig_proto_upgrade() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();
let svc = stack.new_service(Endpoint {
@ -146,6 +152,7 @@ async fn orig_proto_skipped_on_http_upgrade() {
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.push_on_service(http::BoxRequest::layer())
// We need the server-side upgrade layer to annotate the request so that the client
// knows that an HTTP upgrade is in progress.
@ -192,6 +199,8 @@ async fn orig_proto_http2_noop() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();
let svc = stack.new_service(Endpoint {

View File

@ -131,6 +131,10 @@ where
clone.extensions_mut().insert(client_handle);
}
if let Some(classify) = req.extensions().get::<classify::Response>().cloned() {
clone.extensions_mut().insert(classify);
}
Some(clone)
}
}

View File

@ -230,7 +230,7 @@ impl Outbound<()> {
resolve: R,
) where
// Target describing a server-side connection.
T: svc::Param<Remote<ClientAddr>>,
T: svc::Param<AddrPair>,
T: svc::Param<OrigDstAddr>,
T: Clone + Send + Sync + 'static,
// Server-side socket.

View File

@ -113,8 +113,12 @@ impl Recover<tonic::Status> for GrpcRecover {
tonic::Code::InvalidArgument | tonic::Code::FailedPrecondition => Err(status),
// Indicates no policy for this target
tonic::Code::NotFound | tonic::Code::Unimplemented => Err(status),
_ => {
tracing::debug!(%status, "Recovering");
code => {
tracing::warn!(
grpc.status = %code,
grpc.message = status.message(),
"Unexpected policy controller response; retrying with a backoff",
);
Ok(self.0.stream())
}
}

View File

@ -94,7 +94,11 @@ impl Recover<tonic::Status> for BackoffUnlessInvalidArgument {
return Err(status);
}
tracing::trace!(%status, "Recovering");
tracing::warn!(
grpc.status = %status.code(),
grpc.message = status.message(),
"Unexpected destination controller response; retrying with a backoff",
);
Ok(self.0.stream())
}
}

View File

@ -22,7 +22,7 @@ use linkerd_app_core::{
metrics::FmtMetrics,
svc::Param,
telemetry,
transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
transport::{addrs::*, listen::Bind},
Error, ProxyRuntime,
};
use linkerd_app_gateway as gateway;
@ -102,11 +102,17 @@ impl Config {
) -> Result<App, Error>
where
BIn: Bind<ServerConfig> + 'static,
BIn::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>,
BIn::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BOut: Bind<ServerConfig> + 'static,
BOut::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>,
BOut::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BAdmin: Bind<ServerConfig> + Clone + 'static,
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>>,
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<AddrPair>,
{
let Config {
admin,

View File

@ -6,7 +6,7 @@ use linkerd_app_core::{
serve,
svc::{self, ExtractParam, InsertParam, Param},
tls,
transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr},
transport::{addrs::AddrPair, listen::Bind, ClientAddr, Local, Remote, ServerAddr},
Error,
};
use std::{collections::HashSet, pin::Pin};
@ -47,6 +47,7 @@ impl Config {
where
B: Bind<ServerConfig>,
B::Addrs: Param<Remote<ClientAddr>>,
B::Addrs: Param<AddrPair>,
{
let (registry, server) = tap::new();
match self {

View File

@ -6,6 +6,9 @@ license = "Apache-2.0"
edition = "2021"
publish = false
[features]
test-util = []
[dependencies]
bytes = "1"
futures = { version = "0.3", default-features = false }

View File

@ -25,6 +25,22 @@ where
include_latencies: bool,
}
#[cfg(feature = "test-util")]
impl<T: Hash + Eq, C: Hash + Eq> Report<T, requests::Metrics<C>> {
pub fn get_response_total(
&self,
labels: &T,
status: Option<http::StatusCode>,
class: &C,
) -> Option<f64> {
let registry = self.registry.lock();
let requests = registry.get(labels)?.lock();
let status = requests.by_status().get(&status)?;
let class = status.by_class().get(class)?;
Some(class.total())
}
}
impl<T: Hash + Eq, M> Clone for Report<T, M> {
fn clone(&self) -> Self {
Self {

View File

@ -28,7 +28,7 @@ where
}
#[derive(Debug)]
struct StatusMetrics<C>
pub struct StatusMetrics<C>
where
C: Hash + Eq,
{
@ -87,6 +87,17 @@ impl<C: Hash + Eq> Default for Metrics<C> {
}
}
#[cfg(feature = "test-util")]
impl<C: Hash + Eq> Metrics<C> {
pub fn total(&self) -> &Counter {
&self.total
}
pub fn by_status(&self) -> &HashMap<Option<http::StatusCode>, StatusMetrics<C>> {
&self.by_status
}
}
impl<C: Hash + Eq> LastUpdate for Metrics<C> {
fn last_update(&self) -> Instant {
self.last_update
@ -105,6 +116,20 @@ where
}
}
#[cfg(feature = "test-util")]
impl<C: Hash + Eq> StatusMetrics<C> {
pub fn by_class(&self) -> &HashMap<C, ClassMetrics> {
&self.by_class
}
}
#[cfg(feature = "test-util")]
impl ClassMetrics {
pub fn total(&self) -> f64 {
self.total.value()
}
}
#[cfg(test)]
mod tests {
#[test]

View File

@ -96,7 +96,7 @@ where
impl<S, C> Clone for HttpMetrics<S, C>
where
S: Clone,
C: ClassifyResponse + Clone + Default + Send + Sync + 'static,
C: ClassifyResponse + Clone + Send + Sync + 'static,
C::Class: Hash + Eq,
{
fn clone(&self) -> Self {
@ -108,6 +108,16 @@ where
}
}
#[inline]
fn classify_unwrap_if_debug_else_default<C, B>(req: &http::Request<B>) -> C
where
C: Clone + Default + Send + Sync + 'static,
{
let c = req.extensions().get::<C>().cloned();
debug_assert!(c.is_some(), "request must have response classifier");
c.unwrap_or_default()
}
impl<C, P, S, A, B> Proxy<http::Request<A>, S> for HttpMetrics<P, C>
where
P: Proxy<http::Request<RequestBody<A, C::Class>>, S, Response = http::Response<B>>,
@ -143,10 +153,8 @@ where
http::Request::from_parts(head, body)
};
let classify = req.extensions().get::<C>().cloned().unwrap_or_default();
ResponseFuture {
classify: Some(classify),
classify: Some(classify_unwrap_if_debug_else_default(&req)),
metrics: self.metrics.clone(),
stream_open_at: Instant::now(),
inner: self.inner.proxy(svc, req),
@ -167,6 +175,7 @@ where
type Error = Error;
type Future = ResponseFuture<S::Future, C>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}
@ -192,10 +201,8 @@ where
http::Request::from_parts(head, body)
};
let classify = req.extensions().get::<C>().cloned().unwrap_or_default();
ResponseFuture {
classify: Some(classify),
classify: Some(classify_unwrap_if_debug_else_default(&req)),
metrics: self.metrics.clone(),
stream_open_at: Instant::now(),
inner: self.inner.call(req),

View File

@ -2,7 +2,7 @@ use futures_util::future::poll_fn;
use linkerd_error::Error;
use tokio::sync::mpsc;
use tower::discover;
use tracing::{debug, instrument::Instrument, trace};
use tracing::{debug, debug_span, instrument::Instrument, trace, warn};
pub type Result<K, S> = std::result::Result<discover::Change<K, S>, Error>;
pub type Buffer<K, S> = tokio_stream::wrappers::ReceiverStream<Result<K, S>>;
@ -16,6 +16,29 @@ where
{
let (tx, rx) = mpsc::channel(capacity);
// Attempts to send an update to the balancer, returning `true` if sending
// was successful and `false` otherwise.
let send = |tx: &mpsc::Sender<_>, up| {
match tx.try_send(up) {
Ok(()) => true,
// The balancer has been dropped (and will never be used again).
Err(mpsc::error::TrySendError::Closed(_)) => {
debug!("Discovery receiver dropped");
false
}
// The balancer is stalled and we can't continue to buffer
// updates for it.
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(
"The balancer is not processing discovery updates; aborting discovery stream"
);
false
}
}
};
debug!(%capacity, "Spawning discovery buffer");
tokio::spawn(
async move {
@ -23,41 +46,51 @@ where
loop {
let res = tokio::select! {
_ = tx.closed() => break,
biased;
_ = tx.closed() => {
debug!("Discovery receiver dropped");
return;
}
res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res,
};
let change = match res {
match res {
Some(Ok(change)) => {
trace!("Changed");
change
if !send(&tx, Ok(change)) {
// XXX(ver) We don't actually have a way to "blow
// up" the balancer in this situation. My
// understanding is that this will cause the
// balancer to get cut off from further updates,
// should it ever become available again. That needs
// to be fixed.
//
// One option would be to drop the discovery stream
// and rebuild it if the balancer ever becomes
// unblocked.
//
// Ultimately we need to track down how we're
// getting into this blocked/idle state
return;
}
}
Some(Err(e)) => {
let error = e.into();
debug!(%error);
let _ = tx.send(Err(error)).await;
send(&tx, Err(error));
return;
}
None => {
debug!("Discovery stream closed");
return;
}
};
tokio::select! {
_ = tx.closed() => break,
res = tx.send(Ok(change)) => {
if res.is_err() {
break;
}
trace!("Change sent");
}
}
}
debug!("Discovery receiver dropped");
}
.in_current_span(),
.in_current_span()
.instrument(debug_span!("discover")),
);
Buffer::new(rx)

View File

@ -1,4 +1,4 @@
use linkerd_stack::{layer, ExtractParam, NewService, Proxy, Service};
use linkerd_stack::{layer, CloneParam, ExtractParam, NewService, Proxy, Service};
use std::{
marker::PhantomData,
task::{Context, Poll},
@ -33,6 +33,12 @@ impl<C, N> NewInsertClassifyResponse<C, (), N> {
}
}
impl<C: Clone + Default, N> NewInsertClassifyResponse<C, CloneParam<C>, N> {
pub fn layer_default() -> impl layer::Layer<N, Service = Self> + Clone {
Self::layer_via(CloneParam::from(C::default()))
}
}
impl<T, C, X, N> NewService<T> for NewInsertClassifyResponse<C, X, N>
where
C: super::Classify,
@ -61,8 +67,9 @@ where
fn proxy(&self, svc: &mut S, mut req: http::Request<B>) -> Self::Future {
let classify_rsp = self.classify.classify(&req);
let prior = req.extensions_mut().insert(classify_rsp);
debug_assert!(prior.is_none(), "classification extension already existed");
if req.extensions_mut().insert(classify_rsp).is_some() {
tracing::debug!("Overrode response classifier");
}
self.inner.proxy(svc, req)
}
}
@ -83,8 +90,9 @@ where
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
let classify_rsp = self.classify.classify(&req);
let prior = req.extensions_mut().insert(classify_rsp);
debug_assert!(prior.is_none(), "classification extension already existed");
if req.extensions_mut().insert(classify_rsp).is_some() {
tracing::debug!("Overrode response classifier");
}
self.inner.call(req)
}
}

View File

@ -28,6 +28,10 @@ pub struct Local<T>(pub T);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Remote<T>(pub T);
/// Describes a connection from a client to a server.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct AddrPair(pub ClientAddr, pub ServerAddr);
// === impl ClientAddr ===
impl std::ops::Deref for ClientAddr {

View File

@ -15,7 +15,7 @@ pub mod listen;
pub mod orig_dst;
pub use self::{
addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr},
addrs::{AddrPair, ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr},
connect::ConnectTcp,
listen::{Bind, BindTcp},
orig_dst::BindWithOrigDst,

View File

@ -104,3 +104,12 @@ impl Param<Local<ServerAddr>> for Addrs {
self.server
}
}
impl Param<AddrPair> for Addrs {
#[inline]
fn param(&self) -> AddrPair {
let Remote(client) = self.client;
let Local(server) = self.server;
AddrPair(client, server)
}
}

View File

@ -23,6 +23,7 @@ pub struct Addrs<A = listen::Addrs> {
// === impl Addrs ===
impl<A> Param<OrigDstAddr> for Addrs<A> {
#[inline]
fn param(&self) -> OrigDstAddr {
self.orig_dst
}
@ -32,11 +33,23 @@ impl<A> Param<Remote<ClientAddr>> for Addrs<A>
where
A: Param<Remote<ClientAddr>>,
{
#[inline]
fn param(&self) -> Remote<ClientAddr> {
self.inner.param()
}
}
impl<A> Param<AddrPair> for Addrs<A>
where
A: Param<Remote<ClientAddr>>,
{
#[inline]
fn param(&self) -> AddrPair {
let Remote(client) = self.inner.param();
AddrPair(client, ServerAddr(self.orig_dst.into()))
}
}
impl<A> Param<Local<ServerAddr>> for Addrs<A>
where
A: Param<Local<ServerAddr>>,