mirror of https://github.com/linkerd/linkerd2.git
fix(policy): include TCPRoute policy on opaque appProtocol services (#13948)
When a Service has a port marked with `appProtocol: linkerd.io/opaque`, the policy controller returns only the default opaque route and any TCPRoutes attached to that service and port are ignored. We include any TCPRoute in the outbound policy when the `appProtocol` is marked as opaque. Furthermore, we update the logic to treat any unknown value for `appProtocol` as opaque (when previously, an unknown value was ignored, allowing any route type to be attached). The consequence of this is TCPRoute is the only route type that can be attached to a pork with an unknown value for `appProtocol`. Yet furthermore, we updated the logic to allow GRPCRoutes to be attached to ports marked with `appProtocol: kubernetes.io/h2c`. This means that either GRPCRoutes or HTTPRoutes may be attached to such ports and the more specific type (GRPCRoute) will take higher precedence. --------- Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
8bfda07565
commit
25969cfbf9
|
@ -393,11 +393,6 @@ fn to_proto(
|
|||
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
|
||||
|
||||
let kind = match &policy.app_protocol {
|
||||
Some(AppProtocol::Opaque) => {
|
||||
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
|
||||
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
|
||||
})
|
||||
}
|
||||
Some(AppProtocol::Http1) => {
|
||||
http_routes.sort_by(timestamp_then_name);
|
||||
http::http1_only_protocol(
|
||||
|
@ -412,23 +407,56 @@ fn to_proto(
|
|||
)
|
||||
}
|
||||
Some(AppProtocol::Http2) => {
|
||||
http_routes.sort_by(timestamp_then_name);
|
||||
http::http2_only_protocol(
|
||||
backend,
|
||||
http_routes.into_iter(),
|
||||
accrual,
|
||||
policy.http_retry.clone(),
|
||||
policy.timeouts.clone(),
|
||||
allow_l5d_request_headers,
|
||||
&policy.parent_info,
|
||||
original_dst,
|
||||
)
|
||||
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
|
||||
|
||||
if !grpc_routes.is_empty() {
|
||||
grpc_routes.sort_by(timestamp_then_name);
|
||||
grpc::protocol(
|
||||
backend,
|
||||
grpc_routes.into_iter(),
|
||||
accrual,
|
||||
policy.grpc_retry.clone(),
|
||||
policy.timeouts.clone(),
|
||||
allow_l5d_request_headers,
|
||||
&policy.parent_info,
|
||||
original_dst,
|
||||
)
|
||||
} else {
|
||||
http_routes.sort_by(timestamp_then_name);
|
||||
http::http2_only_protocol(
|
||||
backend,
|
||||
http_routes.into_iter(),
|
||||
accrual,
|
||||
policy.http_retry.clone(),
|
||||
policy.timeouts.clone(),
|
||||
allow_l5d_request_headers,
|
||||
&policy.parent_info,
|
||||
original_dst,
|
||||
)
|
||||
}
|
||||
}
|
||||
None | Some(AppProtocol::Unknown(_)) => {
|
||||
Some(AppProtocol::Opaque) | Some(AppProtocol::Unknown(_)) => {
|
||||
if let Some(AppProtocol::Unknown(protocol)) = &policy.app_protocol {
|
||||
tracing::debug!(resource = ?policy.parent_info, port = policy.port.get(), "Unknown appProtocol \"{protocol}\"");
|
||||
}
|
||||
|
||||
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();
|
||||
|
||||
if !tcp_routes.is_empty() {
|
||||
tcp_routes.sort_by(timestamp_then_name);
|
||||
tcp::protocol(
|
||||
backend,
|
||||
tcp_routes.into_iter(),
|
||||
&policy.parent_info,
|
||||
original_dst,
|
||||
)
|
||||
} else {
|
||||
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
|
||||
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
|
||||
})
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
|
||||
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
|
||||
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();
|
||||
|
|
|
@ -2,12 +2,12 @@ use futures::StreamExt;
|
|||
use linkerd_policy_controller_k8s_api as k8s;
|
||||
use linkerd_policy_controller_k8s_api::gateway;
|
||||
use linkerd_policy_test::{
|
||||
assert_resource_meta, create,
|
||||
assert_resource_meta, await_route_accepted, create,
|
||||
outbound_api::{
|
||||
assert_route_is_default, assert_singleton, http1_routes, http2_routes,
|
||||
assert_route_is_default, assert_singleton, grpc_routes, http1_routes, http2_routes,
|
||||
retry_watch_outbound_policy,
|
||||
},
|
||||
test_route::TestParent,
|
||||
test_route::{TestParent, TestRoute},
|
||||
with_temp_ns,
|
||||
};
|
||||
|
||||
|
@ -21,7 +21,6 @@ async fn opaque_parent() {
|
|||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with no routes.
|
||||
// let parent = P::create_parent(&client.clone(), &ns).await;
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("linkerd.io/opaque".to_string())),
|
||||
|
@ -48,6 +47,91 @@ async fn opaque_parent() {
|
|||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[cfg(feature = "gateway-api-experimental")]
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn unknown_app_protocol_parent() {
|
||||
async fn test<P: TestParent>() {
|
||||
tracing::debug!(
|
||||
parent = %P::kind(&P::DynamicType::default()),
|
||||
);
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with no routes.
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("XMPP".to_string())),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
|
||||
|
||||
let routes = linkerd_policy_test::outbound_api::tcp_routes(&config);
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default::<gateway::TCPRoute>(route, &parent.obj_ref(), port);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[cfg(feature = "gateway-api-experimental")]
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn opaque_parent_with_tcp_route() {
|
||||
async fn test<P: TestParent>() {
|
||||
tracing::debug!(
|
||||
parent = %P::kind(&P::DynamicType::default()),
|
||||
);
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with TCPRoute.
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("linkerd.io/opaque".to_string())),
|
||||
)
|
||||
.await;
|
||||
|
||||
let route = create(
|
||||
&client,
|
||||
gateway::TCPRoute::make_route(
|
||||
ns.clone(),
|
||||
vec![parent.obj_ref()],
|
||||
vec![vec![parent.backend_ref(port)]],
|
||||
),
|
||||
)
|
||||
.await;
|
||||
await_route_accepted(&client, &route).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
|
||||
|
||||
gateway::TCPRoute::routes(&config, |routes| {
|
||||
// Only the first TCPRoute should be returned in the config.
|
||||
assert!(route.meta_eq(gateway::TCPRoute::extract_meta(&routes[0])));
|
||||
assert_eq!(routes.len(), 1);
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http1_parent() {
|
||||
async fn test<P: TestParent>() {
|
||||
|
@ -57,7 +141,6 @@ async fn http1_parent() {
|
|||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with no routes.
|
||||
// let parent = P::create_parent(&client.clone(), &ns).await;
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("http".to_string())),
|
||||
|
@ -84,6 +167,52 @@ async fn http1_parent() {
|
|||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http1_parent_with_http_route() {
|
||||
async fn test<P: TestParent>() {
|
||||
tracing::debug!(
|
||||
parent = %P::kind(&P::DynamicType::default()),
|
||||
);
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with HTTPRoute.
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("http".to_string())),
|
||||
)
|
||||
.await;
|
||||
|
||||
let route = create(
|
||||
&client,
|
||||
gateway::HTTPRoute::make_route(
|
||||
ns.clone(),
|
||||
vec![parent.obj_ref()],
|
||||
vec![vec![parent.backend_ref(port)]],
|
||||
),
|
||||
)
|
||||
.await;
|
||||
await_route_accepted(&client, &route).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
|
||||
|
||||
let routes = http1_routes(&config);
|
||||
let outbound_route = assert_singleton(routes);
|
||||
assert!(route.meta_eq(gateway::HTTPRoute::extract_meta(outbound_route)));
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http2_parent() {
|
||||
async fn test<P: TestParent>() {
|
||||
|
@ -93,7 +222,6 @@ async fn http2_parent() {
|
|||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with no routes.
|
||||
// let parent = P::create_parent(&client.clone(), &ns).await;
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
|
||||
|
@ -119,3 +247,95 @@ async fn http2_parent() {
|
|||
|
||||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http2_parent_with_http_route() {
|
||||
async fn test<P: TestParent>() {
|
||||
tracing::debug!(
|
||||
parent = %P::kind(&P::DynamicType::default()),
|
||||
);
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with HTTPRoute.
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
|
||||
)
|
||||
.await;
|
||||
|
||||
let route = create(
|
||||
&client,
|
||||
gateway::HTTPRoute::make_route(
|
||||
ns.clone(),
|
||||
vec![parent.obj_ref()],
|
||||
vec![vec![parent.backend_ref(port)]],
|
||||
),
|
||||
)
|
||||
.await;
|
||||
await_route_accepted(&client, &route).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
|
||||
|
||||
let routes = http2_routes(&config);
|
||||
let outbound_route = assert_singleton(routes);
|
||||
assert!(route.meta_eq(gateway::HTTPRoute::extract_meta(outbound_route)));
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http2_parent_with_grpc_route() {
|
||||
async fn test<P: TestParent>() {
|
||||
tracing::debug!(
|
||||
parent = %P::kind(&P::DynamicType::default()),
|
||||
);
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let port = 4191;
|
||||
// Create a parent with GRPCRoute.
|
||||
let parent = create(
|
||||
&client,
|
||||
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
|
||||
)
|
||||
.await;
|
||||
|
||||
let route = create(
|
||||
&client,
|
||||
gateway::GRPCRoute::make_route(
|
||||
ns.clone(),
|
||||
vec![parent.obj_ref()],
|
||||
vec![vec![parent.backend_ref(port)]],
|
||||
),
|
||||
)
|
||||
.await;
|
||||
await_route_accepted(&client, &route).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
|
||||
|
||||
let routes = grpc_routes(&config);
|
||||
let outbound_route = assert_singleton(routes);
|
||||
assert!(route.meta_eq(gateway::GRPCRoute::extract_meta(outbound_route)));
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
test::<k8s::Service>().await;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue