From 25969cfbf918b09576d6464908e3b9ae2551facf Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Mon, 21 Apr 2025 11:33:48 -0700 Subject: [PATCH] fix(policy): include TCPRoute policy on opaque appProtocol services (#13948) When a Service has a port marked with `appProtocol: linkerd.io/opaque`, the policy controller returns only the default opaque route and any TCPRoutes attached to that service and port are ignored. We include any TCPRoute in the outbound policy when the `appProtocol` is marked as opaque. Furthermore, we update the logic to treat any unknown value for `appProtocol` as opaque (when previously, an unknown value was ignored, allowing any route type to be attached). The consequence of this is TCPRoute is the only route type that can be attached to a pork with an unknown value for `appProtocol`. Yet furthermore, we updated the logic to allow GRPCRoutes to be attached to ports marked with `appProtocol: kubernetes.io/h2c`. This means that either GRPCRoutes or HTTPRoutes may be attached to such ports and the more specific type (GRPCRoute) will take higher precedence. --------- Signed-off-by: Alex Leong --- policy-controller/grpc/src/outbound.rs | 62 +++-- .../tests/outbound_api_app_protocol.rs | 232 +++++++++++++++++- 2 files changed, 271 insertions(+), 23 deletions(-) diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index d37e6eb02..8b375d2b8 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -393,11 +393,6 @@ fn to_proto( let mut http_routes = policy.http_routes.clone().into_iter().collect::>(); let kind = match &policy.app_protocol { - Some(AppProtocol::Opaque) => { - outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque { - routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)], - }) - } Some(AppProtocol::Http1) => { http_routes.sort_by(timestamp_then_name); http::http1_only_protocol( @@ -412,23 +407,56 @@ fn to_proto( ) } Some(AppProtocol::Http2) => { - http_routes.sort_by(timestamp_then_name); - http::http2_only_protocol( - backend, - http_routes.into_iter(), - accrual, - policy.http_retry.clone(), - policy.timeouts.clone(), - allow_l5d_request_headers, - &policy.parent_info, - original_dst, - ) + let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::>(); + + if !grpc_routes.is_empty() { + grpc_routes.sort_by(timestamp_then_name); + grpc::protocol( + backend, + grpc_routes.into_iter(), + accrual, + policy.grpc_retry.clone(), + policy.timeouts.clone(), + allow_l5d_request_headers, + &policy.parent_info, + original_dst, + ) + } else { + http_routes.sort_by(timestamp_then_name); + http::http2_only_protocol( + backend, + http_routes.into_iter(), + accrual, + policy.http_retry.clone(), + policy.timeouts.clone(), + allow_l5d_request_headers, + &policy.parent_info, + original_dst, + ) + } } - None | Some(AppProtocol::Unknown(_)) => { + Some(AppProtocol::Opaque) | Some(AppProtocol::Unknown(_)) => { if let Some(AppProtocol::Unknown(protocol)) = &policy.app_protocol { tracing::debug!(resource = ?policy.parent_info, port = policy.port.get(), "Unknown appProtocol \"{protocol}\""); } + let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::>(); + + if !tcp_routes.is_empty() { + tcp_routes.sort_by(timestamp_then_name); + tcp::protocol( + backend, + tcp_routes.into_iter(), + &policy.parent_info, + original_dst, + ) + } else { + outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque { + routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)], + }) + } + } + None => { let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::>(); let mut tls_routes = policy.tls_routes.clone().into_iter().collect::>(); let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::>(); diff --git a/policy-test/tests/outbound_api_app_protocol.rs b/policy-test/tests/outbound_api_app_protocol.rs index ebec36e87..4af74a3f9 100644 --- a/policy-test/tests/outbound_api_app_protocol.rs +++ b/policy-test/tests/outbound_api_app_protocol.rs @@ -2,12 +2,12 @@ use futures::StreamExt; use linkerd_policy_controller_k8s_api as k8s; use linkerd_policy_controller_k8s_api::gateway; use linkerd_policy_test::{ - assert_resource_meta, create, + assert_resource_meta, await_route_accepted, create, outbound_api::{ - assert_route_is_default, assert_singleton, http1_routes, http2_routes, + assert_route_is_default, assert_singleton, grpc_routes, http1_routes, http2_routes, retry_watch_outbound_policy, }, - test_route::TestParent, + test_route::{TestParent, TestRoute}, with_temp_ns, }; @@ -21,7 +21,6 @@ async fn opaque_parent() { with_temp_ns(|client, ns| async move { let port = 4191; // Create a parent with no routes. - // let parent = P::create_parent(&client.clone(), &ns).await; let parent = create( &client, P::make_parent_with_protocol(&ns, Some("linkerd.io/opaque".to_string())), @@ -48,6 +47,91 @@ async fn opaque_parent() { test::().await; } +#[cfg(feature = "gateway-api-experimental")] +#[tokio::test(flavor = "current_thread")] +async fn unknown_app_protocol_parent() { + async fn test() { + tracing::debug!( + parent = %P::kind(&P::DynamicType::default()), + ); + with_temp_ns(|client, ns| async move { + let port = 4191; + // Create a parent with no routes. + let parent = create( + &client, + P::make_parent_with_protocol(&ns, Some("XMPP".to_string())), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + assert_resource_meta(&config.metadata, parent.obj_ref(), port); + + let routes = linkerd_policy_test::outbound_api::tcp_routes(&config); + let route = assert_singleton(routes); + assert_route_is_default::(route, &parent.obj_ref(), port); + }) + .await; + } + + test::().await; +} + +#[cfg(feature = "gateway-api-experimental")] +#[tokio::test(flavor = "current_thread")] +async fn opaque_parent_with_tcp_route() { + async fn test() { + tracing::debug!( + parent = %P::kind(&P::DynamicType::default()), + ); + with_temp_ns(|client, ns| async move { + let port = 4191; + // Create a parent with TCPRoute. + let parent = create( + &client, + P::make_parent_with_protocol(&ns, Some("linkerd.io/opaque".to_string())), + ) + .await; + + let route = create( + &client, + gateway::TCPRoute::make_route( + ns.clone(), + vec![parent.obj_ref()], + vec![vec![parent.backend_ref(port)]], + ), + ) + .await; + await_route_accepted(&client, &route).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + assert_resource_meta(&config.metadata, parent.obj_ref(), port); + + gateway::TCPRoute::routes(&config, |routes| { + // Only the first TCPRoute should be returned in the config. + assert!(route.meta_eq(gateway::TCPRoute::extract_meta(&routes[0]))); + assert_eq!(routes.len(), 1); + }); + }) + .await; + } + + test::().await; +} + #[tokio::test(flavor = "current_thread")] async fn http1_parent() { async fn test() { @@ -57,7 +141,6 @@ async fn http1_parent() { with_temp_ns(|client, ns| async move { let port = 4191; // Create a parent with no routes. - // let parent = P::create_parent(&client.clone(), &ns).await; let parent = create( &client, P::make_parent_with_protocol(&ns, Some("http".to_string())), @@ -84,6 +167,52 @@ async fn http1_parent() { test::().await; } +#[tokio::test(flavor = "current_thread")] +async fn http1_parent_with_http_route() { + async fn test() { + tracing::debug!( + parent = %P::kind(&P::DynamicType::default()), + ); + with_temp_ns(|client, ns| async move { + let port = 4191; + // Create a parent with HTTPRoute. + let parent = create( + &client, + P::make_parent_with_protocol(&ns, Some("http".to_string())), + ) + .await; + + let route = create( + &client, + gateway::HTTPRoute::make_route( + ns.clone(), + vec![parent.obj_ref()], + vec![vec![parent.backend_ref(port)]], + ), + ) + .await; + await_route_accepted(&client, &route).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + assert_resource_meta(&config.metadata, parent.obj_ref(), port); + + let routes = http1_routes(&config); + let outbound_route = assert_singleton(routes); + assert!(route.meta_eq(gateway::HTTPRoute::extract_meta(outbound_route))); + }) + .await; + } + + test::().await; +} + #[tokio::test(flavor = "current_thread")] async fn http2_parent() { async fn test() { @@ -93,7 +222,6 @@ async fn http2_parent() { with_temp_ns(|client, ns| async move { let port = 4191; // Create a parent with no routes. - // let parent = P::create_parent(&client.clone(), &ns).await; let parent = create( &client, P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())), @@ -119,3 +247,95 @@ async fn http2_parent() { test::().await; } + +#[tokio::test(flavor = "current_thread")] +async fn http2_parent_with_http_route() { + async fn test() { + tracing::debug!( + parent = %P::kind(&P::DynamicType::default()), + ); + with_temp_ns(|client, ns| async move { + let port = 4191; + // Create a parent with HTTPRoute. + let parent = create( + &client, + P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())), + ) + .await; + + let route = create( + &client, + gateway::HTTPRoute::make_route( + ns.clone(), + vec![parent.obj_ref()], + vec![vec![parent.backend_ref(port)]], + ), + ) + .await; + await_route_accepted(&client, &route).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + assert_resource_meta(&config.metadata, parent.obj_ref(), port); + + let routes = http2_routes(&config); + let outbound_route = assert_singleton(routes); + assert!(route.meta_eq(gateway::HTTPRoute::extract_meta(outbound_route))); + }) + .await; + } + + test::().await; +} + +#[tokio::test(flavor = "current_thread")] +async fn http2_parent_with_grpc_route() { + async fn test() { + tracing::debug!( + parent = %P::kind(&P::DynamicType::default()), + ); + with_temp_ns(|client, ns| async move { + let port = 4191; + // Create a parent with GRPCRoute. + let parent = create( + &client, + P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())), + ) + .await; + + let route = create( + &client, + gateway::GRPCRoute::make_route( + ns.clone(), + vec![parent.obj_ref()], + vec![vec![parent.backend_ref(port)]], + ), + ) + .await; + await_route_accepted(&client, &route).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + assert_resource_meta(&config.metadata, parent.obj_ref(), port); + + let routes = grpc_routes(&config); + let outbound_route = assert_singleton(routes); + assert!(route.meta_eq(gateway::GRPCRoute::extract_meta(outbound_route))); + }) + .await; + } + + test::().await; +}