policy: Cleanup outbound policy indexing (#10494)

Follow-up from #10485

Signed-off-by: Alex Leong <alex@buoyant.io>
Co-authored-by: Oliver Gould <ver@buoyant.io>
This commit is contained in:
Alex Leong 2023-03-10 16:47:40 -08:00 committed by GitHub
parent 6bcd2e73f4
commit 871e829fce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 119 additions and 182 deletions

View File

@ -3,19 +3,14 @@
mod http_route; mod http_route;
use api::{
destination,
net::{ip_address::Ip, IPv6, IpAddress},
};
use futures::prelude::*; use futures::prelude::*;
use linkerd2_proxy_api::{ use linkerd2_proxy_api::{
self as api, self as api, destination,
inbound::{ inbound::{
self as proto, self as proto,
inbound_server_policies_server::{InboundServerPolicies, InboundServerPoliciesServer}, inbound_server_policies_server::{InboundServerPolicies, InboundServerPoliciesServer},
}, },
meta::{metadata, Metadata}, meta::{metadata, Metadata},
net::TcpAddress,
outbound::{ outbound::{
self, self,
outbound_policies_server::{OutboundPolicies, OutboundPoliciesServer}, outbound_policies_server::{OutboundPolicies, OutboundPoliciesServer},
@ -31,7 +26,7 @@ use linkerd_policy_controller_core::{
ServerRef, ServerRef,
}; };
use maplit::*; use maplit::*;
use std::{net::IpAddr, num::NonZeroU16, sync::Arc, time}; use std::{net::SocketAddr, num::NonZeroU16, sync::Arc, time};
use tracing::trace; use tracing::trace;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -157,8 +152,20 @@ where
fn service_lookup( fn service_lookup(
&self, &self,
target: TcpAddress, traffic_spec: outbound::TrafficSpec,
) -> Result<(String, String, NonZeroU16), tonic::Status> { ) -> Result<(String, String, NonZeroU16), tonic::Status> {
let target = traffic_spec
.target
.ok_or_else(|| tonic::Status::invalid_argument("target is required"))?;
let target = match target {
outbound::traffic_spec::Target::Addr(target) => target,
outbound::traffic_spec::Target::Authority(_) => {
return Err(tonic::Status::unimplemented(
"getting policy by authority is not supported",
))
}
};
let port = target let port = target
.port .port
.try_into() .try_into()
@ -189,20 +196,7 @@ where
&self, &self,
req: tonic::Request<outbound::TrafficSpec>, req: tonic::Request<outbound::TrafficSpec>,
) -> Result<tonic::Response<outbound::OutboundPolicy>, tonic::Status> { ) -> Result<tonic::Response<outbound::OutboundPolicy>, tonic::Status> {
let traffic_spec = req.into_inner(); let service = self.service_lookup(req.into_inner())?;
let target = traffic_spec
.target
.ok_or_else(|| tonic::Status::invalid_argument("target is required"))?;
let target = match target {
outbound::traffic_spec::Target::Addr(target) => target,
outbound::traffic_spec::Target::Authority(_) => {
return Err(tonic::Status::unimplemented(
"getting policy by authority is not supported",
))
}
};
let service = self.service_lookup(target)?;
let policy = self let policy = self
.index .index
@ -225,20 +219,7 @@ where
&self, &self,
req: tonic::Request<outbound::TrafficSpec>, req: tonic::Request<outbound::TrafficSpec>,
) -> Result<tonic::Response<BoxWatchServiceStream>, tonic::Status> { ) -> Result<tonic::Response<BoxWatchServiceStream>, tonic::Status> {
let traffic_spec = req.into_inner(); let service = self.service_lookup(req.into_inner())?;
let target = traffic_spec
.target
.ok_or_else(|| tonic::Status::invalid_argument("target is required"))?;
let target = match target {
outbound::traffic_spec::Target::Addr(target) => target,
outbound::traffic_spec::Target::Authority(_) => {
return Err(tonic::Status::unimplemented(
"getting policy by authority is not supported",
))
}
};
let service = self.service_lookup(target)?;
let drain = self.drain.clone(); let drain = self.drain.clone();
let rx = self let rx = self
@ -676,7 +657,7 @@ fn convert_outbound_http_route(
rules, rules,
creation_timestamp: _, creation_timestamp: _,
}: OutboundHttpRoute, }: OutboundHttpRoute,
default_backend: outbound::http_route::WeightedRouteBackend, default_backend: outbound::http_route::RouteBackend,
) -> outbound::HttpRoute { ) -> outbound::HttpRoute {
let metadata = Some(Metadata { let metadata = Some(Metadata {
kind: Some(metadata::Kind::Resource(api::meta::Resource { kind: Some(metadata::Kind::Resource(api::meta::Resource {
@ -696,20 +677,24 @@ fn convert_outbound_http_route(
let rules = rules let rules = rules
.into_iter() .into_iter()
.map(|OutboundHttpRouteRule { matches, backends }| { .map(|OutboundHttpRouteRule { matches, backends }| {
let mut backends = backends let backends = backends
.into_iter() .into_iter()
.map(convert_http_backend) .map(convert_http_backend)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if backends.is_empty() { let dist = if backends.is_empty() {
backends = vec![default_backend.clone()]; outbound::http_route::distribution::Kind::FirstAvailable(
} outbound::http_route::distribution::FirstAvailable {
backends: vec![default_backend.clone()],
},
)
} else {
outbound::http_route::distribution::Kind::RandomAvailable(
outbound::http_route::distribution::RandomAvailable { backends },
)
};
outbound::http_route::Rule { outbound::http_route::Rule {
matches: matches.into_iter().map(http_route::convert_match).collect(), matches: matches.into_iter().map(http_route::convert_match).collect(),
backends: Some(outbound::http_route::Distribution { backends: Some(outbound::http_route::Distribution { kind: Some(dist) }),
kind: Some(outbound::http_route::distribution::Kind::RandomAvailable(
outbound::http_route::distribution::RandomAvailable { backends },
)),
}),
filters: Default::default(), filters: Default::default(),
} }
}) })
@ -724,7 +709,9 @@ fn convert_outbound_http_route(
fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend { fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend {
match backend { match backend {
Backend::Addr(addr) => outbound::http_route::WeightedRouteBackend { Backend::Addr(addr) => {
let socket_addr = SocketAddr::new(addr.addr, addr.port.get());
outbound::http_route::WeightedRouteBackend {
weight: addr.weight, weight: addr.weight,
backend: Some(outbound::http_route::RouteBackend { backend: Some(outbound::http_route::RouteBackend {
backend: Some(outbound::Backend { backend: Some(outbound::Backend {
@ -732,7 +719,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
queue: Some(default_queue_config()), queue: Some(default_queue_config()),
kind: Some(outbound::backend::Kind::Forward( kind: Some(outbound::backend::Kind::Forward(
destination::WeightedAddr { destination::WeightedAddr {
addr: Some(convert_tcp_address(addr.addr, addr.port)), addr: Some(socket_addr.into()),
weight: addr.weight, weight: addr.weight,
..Default::default() ..Default::default()
}, },
@ -740,7 +727,8 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
}), }),
filters: Default::default(), filters: Default::default(),
}), }),
}, }
}
Backend::Dst(dst) => outbound::http_route::WeightedRouteBackend { Backend::Dst(dst) => outbound::http_route::WeightedRouteBackend {
weight: dst.weight, weight: dst.weight,
backend: Some(outbound::http_route::RouteBackend { backend: Some(outbound::http_route::RouteBackend {
@ -781,10 +769,8 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
} }
} }
fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::WeightedRouteBackend { fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::RouteBackend {
outbound::http_route::WeightedRouteBackend { outbound::http_route::RouteBackend {
weight: 1,
backend: Some(outbound::http_route::RouteBackend {
backend: Some(outbound::Backend { backend: Some(outbound::Backend {
metadata: Some(Metadata { metadata: Some(Metadata {
kind: Some(metadata::Kind::Default("default".to_string())), kind: Some(metadata::Kind::Default("default".to_string())),
@ -804,13 +790,10 @@ fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::Weig
)), )),
}), }),
filters: Default::default(), filters: Default::default(),
}),
} }
} }
fn default_outbound_http_route( fn default_outbound_http_route(backend: outbound::http_route::RouteBackend) -> outbound::HttpRoute {
backend: outbound::http_route::WeightedRouteBackend,
) -> outbound::HttpRoute {
let metadata = Some(Metadata { let metadata = Some(Metadata {
kind: Some(metadata::Kind::Default("default".to_string())), kind: Some(metadata::Kind::Default("default".to_string())),
}); });
@ -823,8 +806,8 @@ fn default_outbound_http_route(
..Default::default() ..Default::default()
}], }],
backends: Some(outbound::http_route::Distribution { backends: Some(outbound::http_route::Distribution {
kind: Some(outbound::http_route::distribution::Kind::RandomAvailable( kind: Some(outbound::http_route::distribution::Kind::FirstAvailable(
outbound::http_route::distribution::RandomAvailable { outbound::http_route::distribution::FirstAvailable {
backends: vec![backend], backends: vec![backend],
}, },
)), )),
@ -863,40 +846,3 @@ fn default_queue_config() -> outbound::Queue {
), ),
} }
} }
// TODO(ver) this conversion should be made in the api crate.
fn convert_tcp_address(ip_addr: IpAddr, port: NonZeroU16) -> TcpAddress {
let ip = match ip_addr {
IpAddr::V4(ipv4) => Ip::Ipv4(ipv4.into()),
IpAddr::V6(ipv6) => {
let first = [
ipv6.octets()[0],
ipv6.octets()[1],
ipv6.octets()[2],
ipv6.octets()[3],
ipv6.octets()[4],
ipv6.octets()[5],
ipv6.octets()[6],
ipv6.octets()[7],
];
let last = [
ipv6.octets()[8],
ipv6.octets()[9],
ipv6.octets()[10],
ipv6.octets()[11],
ipv6.octets()[12],
ipv6.octets()[13],
ipv6.octets()[14],
ipv6.octets()[15],
];
Ip::Ipv6(IPv6 {
first: u64::from_be_bytes(first),
last: u64::from_be_bytes(last),
})
}
};
TcpAddress {
ip: Some(IpAddress { ip: Some(ip) }),
port: port.get().into(),
}
}

View File

@ -107,7 +107,7 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
self.services.insert(addr, service_ref); self.services.insert(addr, service_ref);
} }
Err(error) => { Err(error) => {
tracing::error!(%error, service=name, "invalid cluster ip"); tracing::error!(%error, service=name, cluster_ip, "invalid cluster ip");
} }
} }
} }
@ -150,23 +150,20 @@ impl Index {
pub fn outbound_policy_rx( pub fn outbound_policy_rx(
&mut self, &mut self,
namespace: &str, namespace: String,
service: &str, service: String,
port: NonZeroU16, port: NonZeroU16,
) -> Result<watch::Receiver<OutboundPolicy>> { ) -> Result<watch::Receiver<OutboundPolicy>> {
let ns = self let ns = self
.namespaces .namespaces
.by_ns .by_ns
.entry(namespace.to_string()) .entry(namespace.clone())
.or_insert_with(|| Namespace { .or_insert_with(|| Namespace {
service_routes: Default::default(), service_routes: Default::default(),
namespace: Arc::new(namespace.to_string()), namespace: Arc::new(namespace.to_string()),
services: Default::default(), services: Default::default(),
}); });
let key = ServicePort { let key = ServicePort { service, port };
service: service.to_string(),
port,
};
tracing::debug!(?key, "subscribing to service port"); tracing::debug!(?key, "subscribing to service port");
let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info); let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info);
Ok(routes.watch.subscribe()) Ok(routes.watch.subscribe())
@ -244,19 +241,13 @@ impl Namespace {
fn service_routes_or_default( fn service_routes_or_default(
&mut self, &mut self,
service_port: ServicePort, sp: ServicePort,
cluster: &ClusterInfo, cluster: &ClusterInfo,
) -> &mut ServiceRoutes { ) -> &mut ServiceRoutes {
let authority = cluster.service_dns_authority( self.service_routes.entry(sp.clone()).or_insert_with(|| {
&self.namespace, let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port);
&service_port.service, let opaque = match self.services.get(&sp.service) {
service_port.port, Some(svc) => svc.opaque_ports.contains(&sp.port),
);
self.service_routes
.entry(service_port.clone())
.or_insert_with(|| {
let opaque = match self.services.get(&service_port.service) {
Some(svc) => svc.opaque_ports.contains(&service_port.port),
None => false, None => false,
}; };
let (sender, _) = watch::channel(OutboundPolicy { let (sender, _) = watch::channel(OutboundPolicy {

View File

@ -71,11 +71,7 @@ impl DiscoverOutboundPolicy<(String, String, NonZeroU16)> for OutboundDiscover {
&self, &self,
(namespace, service, port): (String, String, NonZeroU16), (namespace, service, port): (String, String, NonZeroU16),
) -> Result<Option<OutboundPolicy>> { ) -> Result<Option<OutboundPolicy>> {
let rx = match self let rx = match self.0.write().outbound_policy_rx(namespace, service, port) {
.0
.write()
.outbound_policy_rx(&namespace, &service, port)
{
Ok(rx) => rx, Ok(rx) => rx,
Err(error) => { Err(error) => {
tracing::error!(%error, "failed to get outbound policy rx"); tracing::error!(%error, "failed to get outbound policy rx");
@ -90,11 +86,7 @@ impl DiscoverOutboundPolicy<(String, String, NonZeroU16)> for OutboundDiscover {
&self, &self,
(namespace, service, port): (String, String, NonZeroU16), (namespace, service, port): (String, String, NonZeroU16),
) -> Result<Option<OutboundPolicyStream>> { ) -> Result<Option<OutboundPolicyStream>> {
match self match self.0.write().outbound_policy_rx(namespace, service, port) {
.0
.write()
.outbound_policy_rx(&namespace, &service, port)
{
Ok(rx) => Ok(Some(Box::pin(tokio_stream::wrappers::WatchStream::new(rx)))), Ok(rx) => Ok(Some(Box::pin(tokio_stream::wrappers::WatchStream::new(rx)))),
Err(_) => Ok(None), Err(_) => Ok(None),
} }

View File

@ -119,7 +119,7 @@ async fn service_with_http_routes_without_backends() {
// There should be a route with the logical backend. // There should be a route with the logical backend.
detect_http_routes(&config, |routes| { detect_http_routes(&config, |routes| {
let route = assert_singleton(routes); let route = assert_singleton(routes);
let backends = route_backends_random_available(route); let backends = route_backends_first_available(route);
let backend = assert_singleton(backends); let backend = assert_singleton(backends);
assert_backend_matches_service(backend, &svc, 4191); assert_backend_matches_service(backend, &svc, 4191);
}); });
@ -452,6 +452,23 @@ where
} }
} }
#[track_caller]
fn route_backends_first_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::RouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
_ => panic!("Distribution must be FirstAvailable"),
}
}
#[track_caller] #[track_caller]
fn route_backends_random_available( fn route_backends_random_available(
route: &grpc::outbound::HttpRoute, route: &grpc::outbound::HttpRoute,
@ -494,7 +511,7 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh
#[track_caller] #[track_caller]
fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
let backends = route_backends_random_available(route); let backends = route_backends_first_available(route);
let backend = assert_singleton(backends); let backend = assert_singleton(backends);
assert_backend_matches_service(backend, svc, port); assert_backend_matches_service(backend, svc, port);
@ -509,20 +526,11 @@ fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service
#[track_caller] #[track_caller]
fn assert_backend_matches_service( fn assert_backend_matches_service(
backend: &grpc::outbound::http_route::WeightedRouteBackend, backend: &grpc::outbound::http_route::RouteBackend,
svc: &k8s::Service, svc: &k8s::Service,
port: u16, port: u16,
) { ) {
let kind = backend let kind = backend.backend.as_ref().unwrap().kind.as_ref().unwrap();
.backend
.as_ref()
.unwrap()
.backend
.as_ref()
.unwrap()
.kind
.as_ref()
.unwrap();
let dst = match kind { let dst = match kind {
grpc::outbound::backend::Kind::Balancer(balance) => { grpc::outbound::backend::Kind::Balancer(balance) => {
let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap(); let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();