Compare commits
4 Commits
main
...
release/v2
Author | SHA1 | Date |
---|---|---|
|
7d8a46b56e | |
|
d478c2e253 | |
|
dd47e38a2c | |
|
8b650a9e2f |
26
Cargo.lock
26
Cargo.lock
|
@ -16,14 +16,15 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
|
|||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.3"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
|
||||
checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"getrandom",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1010,6 +1011,7 @@ dependencies = [
|
|||
"linkerd-app-core",
|
||||
"linkerd-app-test",
|
||||
"linkerd-http-access-log",
|
||||
"linkerd-http-metrics",
|
||||
"linkerd-idle-cache",
|
||||
"linkerd-io",
|
||||
"linkerd-meshtls",
|
||||
|
@ -3341,3 +3343,23 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
|
|||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a7af71d8643341260a65f89fa60c0eeaa907f34544d8f6d9b0df72f069b5e74"
|
||||
dependencies = [
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9731702e2f0617ad526794ae28fbc6f6ca8849b5ba729666c2a5bc4b6ddee2cd"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.12",
|
||||
]
|
||||
|
|
15
deny.toml
15
deny.toml
|
@ -17,6 +17,7 @@ ignore = []
|
|||
unlicensed = "deny"
|
||||
allow = [
|
||||
"Apache-2.0",
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
"ISC",
|
||||
"MIT",
|
||||
|
@ -27,13 +28,21 @@ allow-osi-fsf-free = "neither"
|
|||
default = "deny"
|
||||
confidence-threshold = 0.8
|
||||
exceptions = [
|
||||
{ allow = ["Zlib"], name = "adler32", version = "*" },
|
||||
{ allow = ["ISC", "MIT", "OpenSSL"], name = "ring", version = "*" },
|
||||
{ allow = [
|
||||
"Zlib",
|
||||
], name = "adler32", version = "*" },
|
||||
{ allow = [
|
||||
"ISC",
|
||||
"MIT",
|
||||
"OpenSSL",
|
||||
], name = "ring", version = "*" },
|
||||
# The Unicode-DFS-2016 license is necessary for unicode-ident because they
|
||||
# use data from the unicode tables to generate the tables which are
|
||||
# included in the application. We do not distribute those data files so
|
||||
# this is not a problem for us. See https://github.com/dtolnay/unicode-ident/pull/9/files
|
||||
{ allow = ["Unicode-DFS-2016"], name = "unicode-ident", version = "*"},
|
||||
{ allow = [
|
||||
"Unicode-DFS-2016",
|
||||
], name = "unicode-ident", version = "*" },
|
||||
]
|
||||
|
||||
[[licenses.clarify]]
|
||||
|
|
|
@ -7,7 +7,9 @@ use linkerd_app_core::{
|
|||
serve,
|
||||
svc::{self, ExtractParam, InsertParam, Param},
|
||||
tls, trace,
|
||||
transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
|
||||
transport::{
|
||||
self, addrs::AddrPair, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr,
|
||||
},
|
||||
Error, Result,
|
||||
};
|
||||
use linkerd_app_inbound as inbound;
|
||||
|
@ -84,7 +86,9 @@ impl Config {
|
|||
where
|
||||
R: FmtMetrics + Clone + Send + Sync + Unpin + 'static,
|
||||
B: Bind<ServerConfig>,
|
||||
B::Addrs: svc::Param<Remote<ClientAddr>> + svc::Param<Local<ServerAddr>>,
|
||||
B::Addrs: svc::Param<Remote<ClientAddr>>,
|
||||
B::Addrs: svc::Param<Local<ServerAddr>>,
|
||||
B::Addrs: svc::Param<AddrPair>,
|
||||
{
|
||||
let (listen_addr, listen) = bind.bind(&self.server)?;
|
||||
|
||||
|
@ -95,6 +99,7 @@ impl Config {
|
|||
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
|
||||
let admin = svc::stack(move |_| admin.clone())
|
||||
.push(metrics.proxy.http_endpoint.to_layer::<classify::Response, _, Permitted>())
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.push_map_target(|(permit, http)| Permitted { permit, http })
|
||||
.push(inbound::policy::NewHttpPolicy::layer(metrics.http_authz.clone()))
|
||||
.push(Rescue::layer())
|
||||
|
@ -201,6 +206,14 @@ impl Param<Remote<ClientAddr>> for Http {
|
|||
}
|
||||
}
|
||||
|
||||
impl Param<AddrPair> for Http {
|
||||
fn param(&self) -> AddrPair {
|
||||
let Remote(client) = self.tcp.client;
|
||||
let Local(server) = self.tcp.addr;
|
||||
AddrPair(client, server)
|
||||
}
|
||||
}
|
||||
|
||||
impl Param<tls::ConditionalServerTls> for Http {
|
||||
fn param(&self) -> tls::ConditionalServerTls {
|
||||
self.tcp.tls.clone()
|
||||
|
|
|
@ -124,6 +124,7 @@ impl Config {
|
|||
.lift_new()
|
||||
.push(self::balance::layer(dns, resolve_backoff))
|
||||
.push(metrics.to_layer::<classify::Response, _, _>())
|
||||
.push(classify::NewClassify::layer_default())
|
||||
// This buffer allows a resolver client to be shared across stacks.
|
||||
// No load shed is applied here, however, so backpressure may leak
|
||||
// into the caller task.
|
||||
|
|
|
@ -422,7 +422,7 @@ impl FmtLabels for Class {
|
|||
"classification=\"{}\",grpc_status=\"{}\",error=\"\"",
|
||||
class(res.is_ok()),
|
||||
match res {
|
||||
Ok(code) | Err(code) => code,
|
||||
Ok(code) | Err(code) => <i32>::from(*code),
|
||||
}
|
||||
),
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use crate::{
|
||||
io, is_caused_by,
|
||||
svc::{self, Param},
|
||||
transport::{ClientAddr, Remote},
|
||||
Result,
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use linkerd_error::Error;
|
||||
use linkerd_proxy_transport::AddrPair;
|
||||
use tower::util::ServiceExt;
|
||||
use tracing::{debug, debug_span, info, instrument::Instrument, warn};
|
||||
|
||||
|
@ -18,7 +18,7 @@ pub async fn serve<M, S, I, A>(
|
|||
shutdown: impl Future,
|
||||
) where
|
||||
I: Send + 'static,
|
||||
A: Param<Remote<ClientAddr>>,
|
||||
A: Param<AddrPair>,
|
||||
M: svc::NewService<A, Service = S>,
|
||||
S: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
|
||||
S::Error: Into<Error>,
|
||||
|
@ -40,8 +40,8 @@ pub async fn serve<M, S, I, A>(
|
|||
};
|
||||
|
||||
// The local addr should be instrumented from the listener's context.
|
||||
let Remote(ClientAddr(client_addr)) = addrs.param();
|
||||
let span = debug_span!("accept", client.addr = %client_addr).entered();
|
||||
let AddrPair(client_addr, server_addr) = addrs.param();
|
||||
let span = debug_span!("accept", client.addr = %client_addr, server.addr = %server_addr).entered();
|
||||
let accept = new_accept.new_service(addrs);
|
||||
|
||||
// Dispatch all of the work for a given connection onto a
|
||||
|
@ -57,10 +57,20 @@ pub async fn serve<M, S, I, A>(
|
|||
{
|
||||
Ok(()) => debug!("Connection closed"),
|
||||
Err(reason) if is_caused_by::<std::io::Error>(&*reason) => {
|
||||
debug!(%reason, "Connection closed")
|
||||
debug!(
|
||||
reason,
|
||||
client.addr = %client_addr,
|
||||
server.addr = %server_addr,
|
||||
"Connection closed"
|
||||
);
|
||||
}
|
||||
Err(error) => {
|
||||
info!(error, client.addr = %client_addr, "Connection closed")
|
||||
info!(
|
||||
error,
|
||||
client.addr = %client_addr,
|
||||
server.addr = %server_addr,
|
||||
"Connection closed"
|
||||
);
|
||||
}
|
||||
}
|
||||
// Hold the service until the connection is complete. This
|
||||
|
|
|
@ -52,6 +52,7 @@ libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] }
|
|||
[dev-dependencies]
|
||||
hyper = { version = "0.14", features = ["http1", "http2"] }
|
||||
linkerd-app-test = { path = "../test" }
|
||||
linkerd-http-metrics = { path = "../../http-metrics", features = ["test-util"] }
|
||||
linkerd-idle-cache = { path = "../../idle-cache", features = ["test-util"] }
|
||||
linkerd-io = { path = "../../io", features = ["tokio-test"] }
|
||||
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
|
||||
|
|
|
@ -148,7 +148,8 @@ impl<C> Inbound<C> {
|
|||
// Attempts to discover a service profile for each logical target (as
|
||||
// informed by the request's headers). The stack is cached until a
|
||||
// request has not been received for `cache_max_idle_age`.
|
||||
let router = http.clone()
|
||||
let router = http
|
||||
.clone()
|
||||
.check_new_service::<Logical, http::Request<http::BoxBody>>()
|
||||
.push_map_target(|p: Profile| p.logical)
|
||||
.push(profiles::http::NewProxyRouter::layer(
|
||||
|
@ -164,6 +165,7 @@ impl<C> Inbound<C> {
|
|||
.to_layer::<classify::Response, _, _>(),
|
||||
)
|
||||
.push_on_service(http::BoxResponse::layer())
|
||||
// Configure a per-route response classifier based on the profile.
|
||||
.push(classify::NewClassify::layer())
|
||||
.push_http_insert_target::<profiles::http::Route>()
|
||||
.push_map_target(|(route, profile)| ProfileRoute { route, profile })
|
||||
|
@ -186,10 +188,7 @@ impl<C> Inbound<C> {
|
|||
}
|
||||
Ok(svc::Either::B(logical))
|
||||
},
|
||||
http.clone()
|
||||
.push_on_service(svc::MapErr::layer(Error::from))
|
||||
.check_new_service::<Logical, http::Request<_>>()
|
||||
.into_inner(),
|
||||
http.clone().into_inner(),
|
||||
)
|
||||
.check_new_service::<(Option<profiles::Receiver>, Logical), http::Request<_>>();
|
||||
|
||||
|
@ -229,8 +228,7 @@ impl<C> Inbound<C> {
|
|||
// Skip the profile stack if it takes too long to become ready.
|
||||
.push_when_unready(config.profile_skip_timeout, http.into_inner())
|
||||
.push_on_service(
|
||||
svc::layers()
|
||||
.push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical")))
|
||||
rt.metrics.proxy.stack.layer(stack_labels("http", "logical")),
|
||||
)
|
||||
.push(svc::NewQueue::layer_via(config.http_request_queue))
|
||||
.push_new_idle_cached(config.discovery_idle_timeout)
|
||||
|
@ -239,6 +237,9 @@ impl<C> Inbound<C> {
|
|||
.push(http::Retain::layer())
|
||||
.push(http::BoxResponse::layer()),
|
||||
)
|
||||
// Configure default response classification early. It may be
|
||||
// overridden by profile routes above.
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.check_new_service::<Logical, http::Request<http::BoxBody>>()
|
||||
.instrument(|t: &Logical| {
|
||||
let name = t.logical.as_ref().map(tracing::field::display);
|
||||
|
@ -414,12 +415,6 @@ impl Param<metrics::EndpointLabels> for Logical {
|
|||
}
|
||||
}
|
||||
|
||||
impl Param<classify::Request> for Logical {
|
||||
fn param(&self) -> classify::Request {
|
||||
classify::Request::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl tap::Inspect for Logical {
|
||||
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
|
||||
req.extensions().get::<Remote<ClientAddr>>().map(|a| **a)
|
||||
|
|
|
@ -6,10 +6,11 @@ use crate::{
|
|||
},
|
||||
Config, Inbound,
|
||||
};
|
||||
use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response};
|
||||
use hyper::{body::HttpBody, client::conn::Builder as ClientBuilder, Body, Request, Response};
|
||||
use linkerd_app_core::{
|
||||
classify,
|
||||
errors::respond::L5D_PROXY_ERROR,
|
||||
identity, io,
|
||||
identity, io, metrics,
|
||||
proxy::http,
|
||||
svc::{self, NewService, Param},
|
||||
tls,
|
||||
|
@ -19,6 +20,7 @@ use linkerd_app_core::{
|
|||
use linkerd_app_test::connect::ConnectFuture;
|
||||
use linkerd_tracing::test::trace_init;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tokio::time;
|
||||
use tracing::Instrument;
|
||||
|
||||
fn build_server<I>(
|
||||
|
@ -469,6 +471,84 @@ async fn grpc_unmeshed_response_error_header() {
|
|||
let _ = bg.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn grpc_response_class() {
|
||||
let _trace = trace_init();
|
||||
|
||||
// Build a mock connector serves a gRPC server that returns errors.
|
||||
let connect = {
|
||||
let mut server = hyper::server::conn::Http::new();
|
||||
server.http2_only(true);
|
||||
support::connect().endpoint_fn_boxed(
|
||||
Target::addr(),
|
||||
grpc_status_server(server, tonic::Code::Unknown),
|
||||
)
|
||||
};
|
||||
|
||||
// Build a client using the connect that always errors.
|
||||
let mut client = ClientBuilder::new();
|
||||
client.http2_only(true);
|
||||
let profiles = profile::resolver();
|
||||
let profile_tx =
|
||||
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
|
||||
profile_tx.send(profile::Profile::default()).unwrap();
|
||||
let cfg = default_config();
|
||||
let (rt, _shutdown) = runtime();
|
||||
let metrics = rt
|
||||
.metrics
|
||||
.clone()
|
||||
.http_endpoint
|
||||
.into_report(time::Duration::from_secs(3600));
|
||||
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
|
||||
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;
|
||||
|
||||
// Send a request and assert that it is OK with the expected header
|
||||
// message.
|
||||
let req = Request::builder()
|
||||
.method(http::Method::POST)
|
||||
.uri("http://foo.svc.cluster.local:5550")
|
||||
.header(http::header::CONTENT_TYPE, "application/grpc")
|
||||
.body(Body::default())
|
||||
.unwrap();
|
||||
|
||||
let mut response = http_util::http_request(&mut client, req).await.unwrap();
|
||||
assert_eq!(response.status(), http::StatusCode::OK);
|
||||
|
||||
response.body_mut().data().await;
|
||||
let trls = response.body_mut().trailers().await.unwrap().unwrap();
|
||||
assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2");
|
||||
|
||||
let response_total = metrics
|
||||
.get_response_total(
|
||||
&metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels {
|
||||
tls: Target::meshed_h2().1,
|
||||
authority: Some("foo.svc.cluster.local:5550".parse().unwrap()),
|
||||
target_addr: "127.0.0.1:80".parse().unwrap(),
|
||||
policy: metrics::RouteAuthzLabels {
|
||||
route: metrics::RouteLabels {
|
||||
server: metrics::ServerLabel(Arc::new(policy::Meta::Resource {
|
||||
group: "policy.linkerd.io".into(),
|
||||
kind: "server".into(),
|
||||
name: "testsrv".into(),
|
||||
})),
|
||||
route: policy::Meta::new_default("default"),
|
||||
},
|
||||
authz: Arc::new(policy::Meta::Resource {
|
||||
group: "policy.linkerd.io".into(),
|
||||
kind: "serverauthorization".into(),
|
||||
name: "testsaz".into(),
|
||||
}),
|
||||
},
|
||||
}),
|
||||
Some(http::StatusCode::OK),
|
||||
&classify::Class::Grpc(Err(tonic::Code::Unknown)),
|
||||
)
|
||||
.expect("response_total not found");
|
||||
assert_eq!(response_total, 1.0);
|
||||
|
||||
drop((client, bg));
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
fn hello_server(
|
||||
http: hyper::server::conn::Http,
|
||||
|
@ -490,6 +570,42 @@ fn hello_server(
|
|||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
fn grpc_status_server(
|
||||
http: hyper::server::conn::Http,
|
||||
status: tonic::Code,
|
||||
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
|
||||
move |endpoint| {
|
||||
let span = tracing::info_span!("grpc_status_server", ?endpoint);
|
||||
let _e = span.enter();
|
||||
tracing::info!("mock connecting");
|
||||
let (client_io, server_io) = support::io::duplex(4096);
|
||||
tokio::spawn(
|
||||
http.serve_connection(
|
||||
server_io,
|
||||
hyper::service::service_fn(move |request: Request<Body>| async move {
|
||||
tracing::info!(?request);
|
||||
let (mut tx, rx) = Body::channel();
|
||||
tokio::spawn(async move {
|
||||
let mut trls = ::http::HeaderMap::new();
|
||||
trls.insert("grpc-status", (status as u32).to_string().parse().unwrap());
|
||||
tx.send_trailers(trls).await
|
||||
});
|
||||
Ok::<_, io::Error>(
|
||||
http::Response::builder()
|
||||
.version(::http::Version::HTTP_2)
|
||||
.header("content-type", "application/grpc")
|
||||
.body(rx)
|
||||
.unwrap(),
|
||||
)
|
||||
}),
|
||||
)
|
||||
.in_current_span(),
|
||||
);
|
||||
Ok(io::BoxedIo::new(client_io))
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
|
||||
move |_| {
|
||||
|
@ -523,7 +639,7 @@ fn connect_timeout(
|
|||
struct Target(http::Version, tls::ConditionalServerTls);
|
||||
|
||||
#[track_caller]
|
||||
fn check_error_header(hdrs: &hyper::header::HeaderMap, expected: &str) {
|
||||
fn check_error_header(hdrs: &::http::HeaderMap, expected: &str) {
|
||||
let message = hdrs
|
||||
.get(L5D_PROXY_ERROR)
|
||||
.expect("response did not contain l5d-proxy-error header")
|
||||
|
|
|
@ -109,7 +109,11 @@ impl Recover<tonic::Status> for GrpcRecover {
|
|||
return Err(status);
|
||||
}
|
||||
|
||||
tracing::trace!(%status, "Recovering");
|
||||
tracing::warn!(
|
||||
grpc.status = %status.code(),
|
||||
grpc.message = status.message(),
|
||||
"Unexpected policy controller response; retrying with a backoff",
|
||||
);
|
||||
Ok(self.0.stream())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use linkerd_app_core::{
|
|||
io, profiles,
|
||||
proxy::http,
|
||||
serve, svc,
|
||||
transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
|
||||
transport::{self, addrs::*},
|
||||
Error, Result,
|
||||
};
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
@ -43,7 +43,10 @@ impl Inbound<()> {
|
|||
profiles: P,
|
||||
gateway: G,
|
||||
) where
|
||||
A: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr> + Clone + Send + Sync + 'static,
|
||||
A: svc::Param<Remote<ClientAddr>>,
|
||||
A: svc::Param<OrigDstAddr>,
|
||||
A: svc::Param<AddrPair>,
|
||||
A: Clone + Send + Sync + 'static,
|
||||
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
|
||||
I: Debug + Unpin + Send + Sync + 'static,
|
||||
G: svc::NewService<direct::GatewayTransportHeader, Service = GSvc>,
|
||||
|
|
|
@ -27,6 +27,8 @@ async fn http11_forward() {
|
|||
.with_stack(connect)
|
||||
.push_http_tcp_client()
|
||||
.push_http_endpoint::<_, http::BoxBody, _>()
|
||||
.into_stack()
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.into_inner();
|
||||
|
||||
let svc = stack.new_service(Endpoint {
|
||||
|
@ -61,6 +63,8 @@ async fn http2_forward() {
|
|||
.with_stack(connect)
|
||||
.push_http_tcp_client()
|
||||
.push_http_endpoint::<_, http::BoxBody, _>()
|
||||
.into_stack()
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.into_inner();
|
||||
|
||||
let svc = stack.new_service(Endpoint {
|
||||
|
@ -97,6 +101,8 @@ async fn orig_proto_upgrade() {
|
|||
.with_stack(connect)
|
||||
.push_http_tcp_client()
|
||||
.push_http_endpoint::<_, http::BoxBody, _>()
|
||||
.into_stack()
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.into_inner();
|
||||
|
||||
let svc = stack.new_service(Endpoint {
|
||||
|
@ -146,6 +152,7 @@ async fn orig_proto_skipped_on_http_upgrade() {
|
|||
.push_http_tcp_client()
|
||||
.push_http_endpoint::<_, http::BoxBody, _>()
|
||||
.into_stack()
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.push_on_service(http::BoxRequest::layer())
|
||||
// We need the server-side upgrade layer to annotate the request so that the client
|
||||
// knows that an HTTP upgrade is in progress.
|
||||
|
@ -192,6 +199,8 @@ async fn orig_proto_http2_noop() {
|
|||
.with_stack(connect)
|
||||
.push_http_tcp_client()
|
||||
.push_http_endpoint::<_, http::BoxBody, _>()
|
||||
.into_stack()
|
||||
.push(classify::NewClassify::layer_default())
|
||||
.into_inner();
|
||||
|
||||
let svc = stack.new_service(Endpoint {
|
||||
|
|
|
@ -131,6 +131,10 @@ where
|
|||
clone.extensions_mut().insert(client_handle);
|
||||
}
|
||||
|
||||
if let Some(classify) = req.extensions().get::<classify::Response>().cloned() {
|
||||
clone.extensions_mut().insert(classify);
|
||||
}
|
||||
|
||||
Some(clone)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,7 +230,7 @@ impl Outbound<()> {
|
|||
resolve: R,
|
||||
) where
|
||||
// Target describing a server-side connection.
|
||||
T: svc::Param<Remote<ClientAddr>>,
|
||||
T: svc::Param<AddrPair>,
|
||||
T: svc::Param<OrigDstAddr>,
|
||||
T: Clone + Send + Sync + 'static,
|
||||
// Server-side socket.
|
||||
|
|
|
@ -113,8 +113,12 @@ impl Recover<tonic::Status> for GrpcRecover {
|
|||
tonic::Code::InvalidArgument | tonic::Code::FailedPrecondition => Err(status),
|
||||
// Indicates no policy for this target
|
||||
tonic::Code::NotFound | tonic::Code::Unimplemented => Err(status),
|
||||
_ => {
|
||||
tracing::debug!(%status, "Recovering");
|
||||
code => {
|
||||
tracing::warn!(
|
||||
grpc.status = %code,
|
||||
grpc.message = status.message(),
|
||||
"Unexpected policy controller response; retrying with a backoff",
|
||||
);
|
||||
Ok(self.0.stream())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,11 @@ impl Recover<tonic::Status> for BackoffUnlessInvalidArgument {
|
|||
return Err(status);
|
||||
}
|
||||
|
||||
tracing::trace!(%status, "Recovering");
|
||||
tracing::warn!(
|
||||
grpc.status = %status.code(),
|
||||
grpc.message = status.message(),
|
||||
"Unexpected destination controller response; retrying with a backoff",
|
||||
);
|
||||
Ok(self.0.stream())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ use linkerd_app_core::{
|
|||
metrics::FmtMetrics,
|
||||
svc::Param,
|
||||
telemetry,
|
||||
transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
|
||||
transport::{addrs::*, listen::Bind},
|
||||
Error, ProxyRuntime,
|
||||
};
|
||||
use linkerd_app_gateway as gateway;
|
||||
|
@ -102,11 +102,17 @@ impl Config {
|
|||
) -> Result<App, Error>
|
||||
where
|
||||
BIn: Bind<ServerConfig> + 'static,
|
||||
BIn::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>,
|
||||
BIn::Addrs: Param<Remote<ClientAddr>>
|
||||
+ Param<Local<ServerAddr>>
|
||||
+ Param<OrigDstAddr>
|
||||
+ Param<AddrPair>,
|
||||
BOut: Bind<ServerConfig> + 'static,
|
||||
BOut::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>,
|
||||
BOut::Addrs: Param<Remote<ClientAddr>>
|
||||
+ Param<Local<ServerAddr>>
|
||||
+ Param<OrigDstAddr>
|
||||
+ Param<AddrPair>,
|
||||
BAdmin: Bind<ServerConfig> + Clone + 'static,
|
||||
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>>,
|
||||
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<AddrPair>,
|
||||
{
|
||||
let Config {
|
||||
admin,
|
||||
|
|
|
@ -6,7 +6,7 @@ use linkerd_app_core::{
|
|||
serve,
|
||||
svc::{self, ExtractParam, InsertParam, Param},
|
||||
tls,
|
||||
transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr},
|
||||
transport::{addrs::AddrPair, listen::Bind, ClientAddr, Local, Remote, ServerAddr},
|
||||
Error,
|
||||
};
|
||||
use std::{collections::HashSet, pin::Pin};
|
||||
|
@ -47,6 +47,7 @@ impl Config {
|
|||
where
|
||||
B: Bind<ServerConfig>,
|
||||
B::Addrs: Param<Remote<ClientAddr>>,
|
||||
B::Addrs: Param<AddrPair>,
|
||||
{
|
||||
let (registry, server) = tap::new();
|
||||
match self {
|
||||
|
|
|
@ -6,6 +6,9 @@ license = "Apache-2.0"
|
|||
edition = "2021"
|
||||
publish = false
|
||||
|
||||
[features]
|
||||
test-util = []
|
||||
|
||||
[dependencies]
|
||||
bytes = "1"
|
||||
futures = { version = "0.3", default-features = false }
|
||||
|
|
|
@ -25,6 +25,22 @@ where
|
|||
include_latencies: bool,
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-util")]
|
||||
impl<T: Hash + Eq, C: Hash + Eq> Report<T, requests::Metrics<C>> {
|
||||
pub fn get_response_total(
|
||||
&self,
|
||||
labels: &T,
|
||||
status: Option<http::StatusCode>,
|
||||
class: &C,
|
||||
) -> Option<f64> {
|
||||
let registry = self.registry.lock();
|
||||
let requests = registry.get(labels)?.lock();
|
||||
let status = requests.by_status().get(&status)?;
|
||||
let class = status.by_class().get(class)?;
|
||||
Some(class.total())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Hash + Eq, M> Clone for Report<T, M> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
|
|
|
@ -28,7 +28,7 @@ where
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StatusMetrics<C>
|
||||
pub struct StatusMetrics<C>
|
||||
where
|
||||
C: Hash + Eq,
|
||||
{
|
||||
|
@ -87,6 +87,17 @@ impl<C: Hash + Eq> Default for Metrics<C> {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-util")]
|
||||
impl<C: Hash + Eq> Metrics<C> {
|
||||
pub fn total(&self) -> &Counter {
|
||||
&self.total
|
||||
}
|
||||
|
||||
pub fn by_status(&self) -> &HashMap<Option<http::StatusCode>, StatusMetrics<C>> {
|
||||
&self.by_status
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Hash + Eq> LastUpdate for Metrics<C> {
|
||||
fn last_update(&self) -> Instant {
|
||||
self.last_update
|
||||
|
@ -105,6 +116,20 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-util")]
|
||||
impl<C: Hash + Eq> StatusMetrics<C> {
|
||||
pub fn by_class(&self) -> &HashMap<C, ClassMetrics> {
|
||||
&self.by_class
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-util")]
|
||||
impl ClassMetrics {
|
||||
pub fn total(&self) -> f64 {
|
||||
self.total.value()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
|
|
|
@ -96,7 +96,7 @@ where
|
|||
impl<S, C> Clone for HttpMetrics<S, C>
|
||||
where
|
||||
S: Clone,
|
||||
C: ClassifyResponse + Clone + Default + Send + Sync + 'static,
|
||||
C: ClassifyResponse + Clone + Send + Sync + 'static,
|
||||
C::Class: Hash + Eq,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
|
@ -108,6 +108,16 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn classify_unwrap_if_debug_else_default<C, B>(req: &http::Request<B>) -> C
|
||||
where
|
||||
C: Clone + Default + Send + Sync + 'static,
|
||||
{
|
||||
let c = req.extensions().get::<C>().cloned();
|
||||
debug_assert!(c.is_some(), "request must have response classifier");
|
||||
c.unwrap_or_default()
|
||||
}
|
||||
|
||||
impl<C, P, S, A, B> Proxy<http::Request<A>, S> for HttpMetrics<P, C>
|
||||
where
|
||||
P: Proxy<http::Request<RequestBody<A, C::Class>>, S, Response = http::Response<B>>,
|
||||
|
@ -143,10 +153,8 @@ where
|
|||
http::Request::from_parts(head, body)
|
||||
};
|
||||
|
||||
let classify = req.extensions().get::<C>().cloned().unwrap_or_default();
|
||||
|
||||
ResponseFuture {
|
||||
classify: Some(classify),
|
||||
classify: Some(classify_unwrap_if_debug_else_default(&req)),
|
||||
metrics: self.metrics.clone(),
|
||||
stream_open_at: Instant::now(),
|
||||
inner: self.inner.proxy(svc, req),
|
||||
|
@ -167,6 +175,7 @@ where
|
|||
type Error = Error;
|
||||
type Future = ResponseFuture<S::Future, C>;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx).map_err(Into::into)
|
||||
}
|
||||
|
@ -192,10 +201,8 @@ where
|
|||
http::Request::from_parts(head, body)
|
||||
};
|
||||
|
||||
let classify = req.extensions().get::<C>().cloned().unwrap_or_default();
|
||||
|
||||
ResponseFuture {
|
||||
classify: Some(classify),
|
||||
classify: Some(classify_unwrap_if_debug_else_default(&req)),
|
||||
metrics: self.metrics.clone(),
|
||||
stream_open_at: Instant::now(),
|
||||
inner: self.inner.call(req),
|
||||
|
|
|
@ -2,7 +2,7 @@ use futures_util::future::poll_fn;
|
|||
use linkerd_error::Error;
|
||||
use tokio::sync::mpsc;
|
||||
use tower::discover;
|
||||
use tracing::{debug, instrument::Instrument, trace};
|
||||
use tracing::{debug, debug_span, instrument::Instrument, trace, warn};
|
||||
|
||||
pub type Result<K, S> = std::result::Result<discover::Change<K, S>, Error>;
|
||||
pub type Buffer<K, S> = tokio_stream::wrappers::ReceiverStream<Result<K, S>>;
|
||||
|
@ -16,6 +16,29 @@ where
|
|||
{
|
||||
let (tx, rx) = mpsc::channel(capacity);
|
||||
|
||||
// Attempts to send an update to the balancer, returning `true` if sending
|
||||
// was successful and `false` otherwise.
|
||||
let send = |tx: &mpsc::Sender<_>, up| {
|
||||
match tx.try_send(up) {
|
||||
Ok(()) => true,
|
||||
|
||||
// The balancer has been dropped (and will never be used again).
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
debug!("Discovery receiver dropped");
|
||||
false
|
||||
}
|
||||
|
||||
// The balancer is stalled and we can't continue to buffer
|
||||
// updates for it.
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
warn!(
|
||||
"The balancer is not processing discovery updates; aborting discovery stream"
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
debug!(%capacity, "Spawning discovery buffer");
|
||||
tokio::spawn(
|
||||
async move {
|
||||
|
@ -23,41 +46,51 @@ where
|
|||
|
||||
loop {
|
||||
let res = tokio::select! {
|
||||
_ = tx.closed() => break,
|
||||
biased;
|
||||
|
||||
_ = tx.closed() => {
|
||||
debug!("Discovery receiver dropped");
|
||||
return;
|
||||
}
|
||||
|
||||
res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res,
|
||||
};
|
||||
|
||||
let change = match res {
|
||||
match res {
|
||||
Some(Ok(change)) => {
|
||||
trace!("Changed");
|
||||
change
|
||||
if !send(&tx, Ok(change)) {
|
||||
// XXX(ver) We don't actually have a way to "blow
|
||||
// up" the balancer in this situation. My
|
||||
// understanding is that this will cause the
|
||||
// balancer to get cut off from further updates,
|
||||
// should it ever become available again. That needs
|
||||
// to be fixed.
|
||||
//
|
||||
// One option would be to drop the discovery stream
|
||||
// and rebuild it if the balancer ever becomes
|
||||
// unblocked.
|
||||
//
|
||||
// Ultimately we need to track down how we're
|
||||
// getting into this blocked/idle state
|
||||
return;
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
let error = e.into();
|
||||
debug!(%error);
|
||||
let _ = tx.send(Err(error)).await;
|
||||
send(&tx, Err(error));
|
||||
return;
|
||||
}
|
||||
None => {
|
||||
debug!("Discovery stream closed");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = tx.closed() => break,
|
||||
res = tx.send(Ok(change)) => {
|
||||
if res.is_err() {
|
||||
break;
|
||||
}
|
||||
trace!("Change sent");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Discovery receiver dropped");
|
||||
}
|
||||
.in_current_span(),
|
||||
.in_current_span()
|
||||
.instrument(debug_span!("discover")),
|
||||
);
|
||||
|
||||
Buffer::new(rx)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use linkerd_stack::{layer, ExtractParam, NewService, Proxy, Service};
|
||||
use linkerd_stack::{layer, CloneParam, ExtractParam, NewService, Proxy, Service};
|
||||
use std::{
|
||||
marker::PhantomData,
|
||||
task::{Context, Poll},
|
||||
|
@ -33,6 +33,12 @@ impl<C, N> NewInsertClassifyResponse<C, (), N> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<C: Clone + Default, N> NewInsertClassifyResponse<C, CloneParam<C>, N> {
|
||||
pub fn layer_default() -> impl layer::Layer<N, Service = Self> + Clone {
|
||||
Self::layer_via(CloneParam::from(C::default()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C, X, N> NewService<T> for NewInsertClassifyResponse<C, X, N>
|
||||
where
|
||||
C: super::Classify,
|
||||
|
@ -61,8 +67,9 @@ where
|
|||
|
||||
fn proxy(&self, svc: &mut S, mut req: http::Request<B>) -> Self::Future {
|
||||
let classify_rsp = self.classify.classify(&req);
|
||||
let prior = req.extensions_mut().insert(classify_rsp);
|
||||
debug_assert!(prior.is_none(), "classification extension already existed");
|
||||
if req.extensions_mut().insert(classify_rsp).is_some() {
|
||||
tracing::debug!("Overrode response classifier");
|
||||
}
|
||||
self.inner.proxy(svc, req)
|
||||
}
|
||||
}
|
||||
|
@ -83,8 +90,9 @@ where
|
|||
|
||||
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
|
||||
let classify_rsp = self.classify.classify(&req);
|
||||
let prior = req.extensions_mut().insert(classify_rsp);
|
||||
debug_assert!(prior.is_none(), "classification extension already existed");
|
||||
if req.extensions_mut().insert(classify_rsp).is_some() {
|
||||
tracing::debug!("Overrode response classifier");
|
||||
}
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,10 @@ pub struct Local<T>(pub T);
|
|||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct Remote<T>(pub T);
|
||||
|
||||
/// Describes a connection from a client to a server.
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct AddrPair(pub ClientAddr, pub ServerAddr);
|
||||
|
||||
// === impl ClientAddr ===
|
||||
|
||||
impl std::ops::Deref for ClientAddr {
|
||||
|
|
|
@ -15,7 +15,7 @@ pub mod listen;
|
|||
pub mod orig_dst;
|
||||
|
||||
pub use self::{
|
||||
addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr},
|
||||
addrs::{AddrPair, ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr},
|
||||
connect::ConnectTcp,
|
||||
listen::{Bind, BindTcp},
|
||||
orig_dst::BindWithOrigDst,
|
||||
|
|
|
@ -104,3 +104,12 @@ impl Param<Local<ServerAddr>> for Addrs {
|
|||
self.server
|
||||
}
|
||||
}
|
||||
|
||||
impl Param<AddrPair> for Addrs {
|
||||
#[inline]
|
||||
fn param(&self) -> AddrPair {
|
||||
let Remote(client) = self.client;
|
||||
let Local(server) = self.server;
|
||||
AddrPair(client, server)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ pub struct Addrs<A = listen::Addrs> {
|
|||
// === impl Addrs ===
|
||||
|
||||
impl<A> Param<OrigDstAddr> for Addrs<A> {
|
||||
#[inline]
|
||||
fn param(&self) -> OrigDstAddr {
|
||||
self.orig_dst
|
||||
}
|
||||
|
@ -32,11 +33,23 @@ impl<A> Param<Remote<ClientAddr>> for Addrs<A>
|
|||
where
|
||||
A: Param<Remote<ClientAddr>>,
|
||||
{
|
||||
#[inline]
|
||||
fn param(&self) -> Remote<ClientAddr> {
|
||||
self.inner.param()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> Param<AddrPair> for Addrs<A>
|
||||
where
|
||||
A: Param<Remote<ClientAddr>>,
|
||||
{
|
||||
#[inline]
|
||||
fn param(&self) -> AddrPair {
|
||||
let Remote(client) = self.inner.param();
|
||||
AddrPair(client, ServerAddr(self.orig_dst.into()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> Param<Local<ServerAddr>> for Addrs<A>
|
||||
where
|
||||
A: Param<Local<ServerAddr>>,
|
||||
|
|
Loading…
Reference in New Issue