From 871e829fce4c65130f4dcfbd87a0832972a09b62 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Fri, 10 Mar 2023 16:47:40 -0800 Subject: [PATCH] policy: Cleanup outbound policy indexing (#10494) Follow-up from #10485 Signed-off-by: Alex Leong Co-authored-by: Oliver Gould --- policy-controller/grpc/src/lib.rs | 198 +++++++----------- .../k8s/index/src/outbound_index.rs | 57 +++-- policy-controller/src/lib.rs | 12 +- policy-test/tests/outbound_api.rs | 34 +-- 4 files changed, 119 insertions(+), 182 deletions(-) diff --git a/policy-controller/grpc/src/lib.rs b/policy-controller/grpc/src/lib.rs index ae613ad71..82d568209 100644 --- a/policy-controller/grpc/src/lib.rs +++ b/policy-controller/grpc/src/lib.rs @@ -3,19 +3,14 @@ mod http_route; -use api::{ - destination, - net::{ip_address::Ip, IPv6, IpAddress}, -}; use futures::prelude::*; use linkerd2_proxy_api::{ - self as api, + self as api, destination, inbound::{ self as proto, inbound_server_policies_server::{InboundServerPolicies, InboundServerPoliciesServer}, }, meta::{metadata, Metadata}, - net::TcpAddress, outbound::{ self, outbound_policies_server::{OutboundPolicies, OutboundPoliciesServer}, @@ -31,7 +26,7 @@ use linkerd_policy_controller_core::{ ServerRef, }; use maplit::*; -use std::{net::IpAddr, num::NonZeroU16, sync::Arc, time}; +use std::{net::SocketAddr, num::NonZeroU16, sync::Arc, time}; use tracing::trace; #[derive(Clone, Debug)] @@ -157,8 +152,20 @@ where fn service_lookup( &self, - target: TcpAddress, + traffic_spec: outbound::TrafficSpec, ) -> Result<(String, String, NonZeroU16), tonic::Status> { + let target = traffic_spec + .target + .ok_or_else(|| tonic::Status::invalid_argument("target is required"))?; + let target = match target { + outbound::traffic_spec::Target::Addr(target) => target, + outbound::traffic_spec::Target::Authority(_) => { + return Err(tonic::Status::unimplemented( + "getting policy by authority is not supported", + )) + } + }; + let port = target .port .try_into() @@ -189,20 +196,7 @@ where &self, req: tonic::Request, ) -> Result, tonic::Status> { - let traffic_spec = req.into_inner(); - - let target = traffic_spec - .target - .ok_or_else(|| tonic::Status::invalid_argument("target is required"))?; - let target = match target { - outbound::traffic_spec::Target::Addr(target) => target, - outbound::traffic_spec::Target::Authority(_) => { - return Err(tonic::Status::unimplemented( - "getting policy by authority is not supported", - )) - } - }; - let service = self.service_lookup(target)?; + let service = self.service_lookup(req.into_inner())?; let policy = self .index @@ -225,20 +219,7 @@ where &self, req: tonic::Request, ) -> Result, tonic::Status> { - let traffic_spec = req.into_inner(); - - let target = traffic_spec - .target - .ok_or_else(|| tonic::Status::invalid_argument("target is required"))?; - let target = match target { - outbound::traffic_spec::Target::Addr(target) => target, - outbound::traffic_spec::Target::Authority(_) => { - return Err(tonic::Status::unimplemented( - "getting policy by authority is not supported", - )) - } - }; - let service = self.service_lookup(target)?; + let service = self.service_lookup(req.into_inner())?; let drain = self.drain.clone(); let rx = self @@ -676,7 +657,7 @@ fn convert_outbound_http_route( rules, creation_timestamp: _, }: OutboundHttpRoute, - default_backend: outbound::http_route::WeightedRouteBackend, + default_backend: outbound::http_route::RouteBackend, ) -> outbound::HttpRoute { let metadata = Some(Metadata { kind: Some(metadata::Kind::Resource(api::meta::Resource { @@ -696,20 +677,24 @@ fn convert_outbound_http_route( let rules = rules .into_iter() .map(|OutboundHttpRouteRule { matches, backends }| { - let mut backends = backends + let backends = backends .into_iter() .map(convert_http_backend) .collect::>(); - if backends.is_empty() { - backends = vec![default_backend.clone()]; - } + let dist = if backends.is_empty() { + outbound::http_route::distribution::Kind::FirstAvailable( + outbound::http_route::distribution::FirstAvailable { + backends: vec![default_backend.clone()], + }, + ) + } else { + outbound::http_route::distribution::Kind::RandomAvailable( + outbound::http_route::distribution::RandomAvailable { backends }, + ) + }; outbound::http_route::Rule { matches: matches.into_iter().map(http_route::convert_match).collect(), - backends: Some(outbound::http_route::Distribution { - kind: Some(outbound::http_route::distribution::Kind::RandomAvailable( - outbound::http_route::distribution::RandomAvailable { backends }, - )), - }), + backends: Some(outbound::http_route::Distribution { kind: Some(dist) }), filters: Default::default(), } }) @@ -724,23 +709,26 @@ fn convert_outbound_http_route( fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend { match backend { - Backend::Addr(addr) => outbound::http_route::WeightedRouteBackend { - weight: addr.weight, - backend: Some(outbound::http_route::RouteBackend { - backend: Some(outbound::Backend { - metadata: None, - queue: Some(default_queue_config()), - kind: Some(outbound::backend::Kind::Forward( - destination::WeightedAddr { - addr: Some(convert_tcp_address(addr.addr, addr.port)), - weight: addr.weight, - ..Default::default() - }, - )), + Backend::Addr(addr) => { + let socket_addr = SocketAddr::new(addr.addr, addr.port.get()); + outbound::http_route::WeightedRouteBackend { + weight: addr.weight, + backend: Some(outbound::http_route::RouteBackend { + backend: Some(outbound::Backend { + metadata: None, + queue: Some(default_queue_config()), + kind: Some(outbound::backend::Kind::Forward( + destination::WeightedAddr { + addr: Some(socket_addr.into()), + weight: addr.weight, + ..Default::default() + }, + )), + }), + filters: Default::default(), }), - filters: Default::default(), - }), - }, + } + } Backend::Dst(dst) => outbound::http_route::WeightedRouteBackend { weight: dst.weight, backend: Some(outbound::http_route::RouteBackend { @@ -781,36 +769,31 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute } } -fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::WeightedRouteBackend { - outbound::http_route::WeightedRouteBackend { - weight: 1, - backend: Some(outbound::http_route::RouteBackend { - backend: Some(outbound::Backend { - metadata: Some(Metadata { - kind: Some(metadata::Kind::Default("default".to_string())), - }), - queue: Some(default_queue_config()), - kind: Some(outbound::backend::Kind::Balancer( - outbound::backend::BalanceP2c { - discovery: Some(outbound::backend::EndpointDiscovery { - kind: Some(outbound::backend::endpoint_discovery::Kind::Dst( - outbound::backend::endpoint_discovery::DestinationGet { - path: outbound.authority.clone(), - }, - )), - }), - load: Some(default_balancer_config()), - }, - )), +fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::RouteBackend { + outbound::http_route::RouteBackend { + backend: Some(outbound::Backend { + metadata: Some(Metadata { + kind: Some(metadata::Kind::Default("default".to_string())), }), - filters: Default::default(), + queue: Some(default_queue_config()), + kind: Some(outbound::backend::Kind::Balancer( + outbound::backend::BalanceP2c { + discovery: Some(outbound::backend::EndpointDiscovery { + kind: Some(outbound::backend::endpoint_discovery::Kind::Dst( + outbound::backend::endpoint_discovery::DestinationGet { + path: outbound.authority.clone(), + }, + )), + }), + load: Some(default_balancer_config()), + }, + )), }), + filters: Default::default(), } } -fn default_outbound_http_route( - backend: outbound::http_route::WeightedRouteBackend, -) -> outbound::HttpRoute { +fn default_outbound_http_route(backend: outbound::http_route::RouteBackend) -> outbound::HttpRoute { let metadata = Some(Metadata { kind: Some(metadata::Kind::Default("default".to_string())), }); @@ -823,8 +806,8 @@ fn default_outbound_http_route( ..Default::default() }], backends: Some(outbound::http_route::Distribution { - kind: Some(outbound::http_route::distribution::Kind::RandomAvailable( - outbound::http_route::distribution::RandomAvailable { + kind: Some(outbound::http_route::distribution::Kind::FirstAvailable( + outbound::http_route::distribution::FirstAvailable { backends: vec![backend], }, )), @@ -863,40 +846,3 @@ fn default_queue_config() -> outbound::Queue { ), } } - -// TODO(ver) this conversion should be made in the api crate. -fn convert_tcp_address(ip_addr: IpAddr, port: NonZeroU16) -> TcpAddress { - let ip = match ip_addr { - IpAddr::V4(ipv4) => Ip::Ipv4(ipv4.into()), - IpAddr::V6(ipv6) => { - let first = [ - ipv6.octets()[0], - ipv6.octets()[1], - ipv6.octets()[2], - ipv6.octets()[3], - ipv6.octets()[4], - ipv6.octets()[5], - ipv6.octets()[6], - ipv6.octets()[7], - ]; - let last = [ - ipv6.octets()[8], - ipv6.octets()[9], - ipv6.octets()[10], - ipv6.octets()[11], - ipv6.octets()[12], - ipv6.octets()[13], - ipv6.octets()[14], - ipv6.octets()[15], - ]; - Ip::Ipv6(IPv6 { - first: u64::from_be_bytes(first), - last: u64::from_be_bytes(last), - }) - } - }; - TcpAddress { - ip: Some(IpAddress { ip: Some(ip) }), - port: port.get().into(), - } -} diff --git a/policy-controller/k8s/index/src/outbound_index.rs b/policy-controller/k8s/index/src/outbound_index.rs index 47aed2ed5..296f97bfc 100644 --- a/policy-controller/k8s/index/src/outbound_index.rs +++ b/policy-controller/k8s/index/src/outbound_index.rs @@ -107,7 +107,7 @@ impl kubert::index::IndexNamespacedResource for Index { self.services.insert(addr, service_ref); } Err(error) => { - tracing::error!(%error, service=name, "invalid cluster ip"); + tracing::error!(%error, service=name, cluster_ip, "invalid cluster ip"); } } } @@ -150,23 +150,20 @@ impl Index { pub fn outbound_policy_rx( &mut self, - namespace: &str, - service: &str, + namespace: String, + service: String, port: NonZeroU16, ) -> Result> { let ns = self .namespaces .by_ns - .entry(namespace.to_string()) + .entry(namespace.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), namespace: Arc::new(namespace.to_string()), services: Default::default(), }); - let key = ServicePort { - service: service.to_string(), - port, - }; + let key = ServicePort { service, port }; tracing::debug!(?key, "subscribing to service port"); let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info); Ok(routes.watch.subscribe()) @@ -244,33 +241,27 @@ impl Namespace { fn service_routes_or_default( &mut self, - service_port: ServicePort, + sp: ServicePort, cluster: &ClusterInfo, ) -> &mut ServiceRoutes { - let authority = cluster.service_dns_authority( - &self.namespace, - &service_port.service, - service_port.port, - ); - self.service_routes - .entry(service_port.clone()) - .or_insert_with(|| { - let opaque = match self.services.get(&service_port.service) { - Some(svc) => svc.opaque_ports.contains(&service_port.port), - None => false, - }; - let (sender, _) = watch::channel(OutboundPolicy { - http_routes: Default::default(), - authority, - namespace: self.namespace.to_string(), - opaque, - }); - ServiceRoutes { - routes: Default::default(), - watch: sender, - opaque, - } - }) + self.service_routes.entry(sp.clone()).or_insert_with(|| { + let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port); + let opaque = match self.services.get(&sp.service) { + Some(svc) => svc.opaque_ports.contains(&sp.port), + None => false, + }; + let (sender, _) = watch::channel(OutboundPolicy { + http_routes: Default::default(), + authority, + namespace: self.namespace.to_string(), + opaque, + }); + ServiceRoutes { + routes: Default::default(), + watch: sender, + opaque, + } + }) } fn convert_route(&self, route: HttpRoute, cluster: &ClusterInfo) -> Result { diff --git a/policy-controller/src/lib.rs b/policy-controller/src/lib.rs index 28c0a989f..d0764eac2 100644 --- a/policy-controller/src/lib.rs +++ b/policy-controller/src/lib.rs @@ -71,11 +71,7 @@ impl DiscoverOutboundPolicy<(String, String, NonZeroU16)> for OutboundDiscover { &self, (namespace, service, port): (String, String, NonZeroU16), ) -> Result> { - let rx = match self - .0 - .write() - .outbound_policy_rx(&namespace, &service, port) - { + let rx = match self.0.write().outbound_policy_rx(namespace, service, port) { Ok(rx) => rx, Err(error) => { tracing::error!(%error, "failed to get outbound policy rx"); @@ -90,11 +86,7 @@ impl DiscoverOutboundPolicy<(String, String, NonZeroU16)> for OutboundDiscover { &self, (namespace, service, port): (String, String, NonZeroU16), ) -> Result> { - match self - .0 - .write() - .outbound_policy_rx(&namespace, &service, port) - { + match self.0.write().outbound_policy_rx(namespace, service, port) { Ok(rx) => Ok(Some(Box::pin(tokio_stream::wrappers::WatchStream::new(rx)))), Err(_) => Ok(None), } diff --git a/policy-test/tests/outbound_api.rs b/policy-test/tests/outbound_api.rs index adee36759..6a518c0be 100644 --- a/policy-test/tests/outbound_api.rs +++ b/policy-test/tests/outbound_api.rs @@ -119,7 +119,7 @@ async fn service_with_http_routes_without_backends() { // There should be a route with the logical backend. detect_http_routes(&config, |routes| { let route = assert_singleton(routes); - let backends = route_backends_random_available(route); + let backends = route_backends_first_available(route); let backend = assert_singleton(backends); assert_backend_matches_service(backend, &svc, 4191); }); @@ -452,6 +452,23 @@ where } } +#[track_caller] +fn route_backends_first_available( + route: &grpc::outbound::HttpRoute, +) -> &[grpc::outbound::http_route::RouteBackend] { + let kind = assert_singleton(&route.rules) + .backends + .as_ref() + .expect("Rule must have backends") + .kind + .as_ref() + .expect("Backend must have kind"); + match kind { + grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends, + _ => panic!("Distribution must be FirstAvailable"), + } +} + #[track_caller] fn route_backends_random_available( route: &grpc::outbound::HttpRoute, @@ -494,7 +511,7 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh #[track_caller] fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { - let backends = route_backends_random_available(route); + let backends = route_backends_first_available(route); let backend = assert_singleton(backends); assert_backend_matches_service(backend, svc, port); @@ -509,20 +526,11 @@ fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service #[track_caller] fn assert_backend_matches_service( - backend: &grpc::outbound::http_route::WeightedRouteBackend, + backend: &grpc::outbound::http_route::RouteBackend, svc: &k8s::Service, port: u16, ) { - let kind = backend - .backend - .as_ref() - .unwrap() - .backend - .as_ref() - .unwrap() - .kind - .as_ref() - .unwrap(); + let kind = backend.backend.as_ref().unwrap().kind.as_ref().unwrap(); let dst = match kind { grpc::outbound::backend::Kind::Balancer(balance) => { let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();