This commit is contained in:
Oliver Gould 2024-10-22 04:17:19 +00:00
parent a03622eea2
commit f2d5cf54f5
No known key found for this signature in database
5 changed files with 26 additions and 69 deletions

View File

@ -36,7 +36,7 @@ pub struct HttpMetrics {
grpc_route: policy::GrpcRouteMetrics,
}
pub(crate) fn spawn_routes<T>(
pub fn spawn_routes<T>(
mut route_rx: watch::Receiver<T>,
init: Routes,
mut mk: impl FnMut(&T) -> Option<Routes> + Send + Sync + 'static,
@ -72,7 +72,7 @@ where
rx
}
pub(crate) fn spawn_routes_default(addr: Remote<ServerAddr>) -> watch::Receiver<Routes> {
pub fn spawn_routes_default(addr: Remote<ServerAddr>) -> watch::Receiver<Routes> {
let (tx, rx) = watch::channel(Routes::Endpoint(addr, Default::default()));
tokio::spawn(async move {
tx.closed().await;

View File

@ -17,10 +17,10 @@ use tokio::sync::watch;
mod concrete;
mod logical;
pub use self::logical::{Logical, PolicyRoutes, ProfileRoutes, Routes};
pub use self::logical::{PolicyRoutes, ProfileRoutes, Routes};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Opaq(Logical);
struct Opaq(watch::Receiver<Routes>);
#[derive(Clone, Debug, Default)]
pub struct OpaqMetrics {
@ -63,14 +63,6 @@ where
rx
}
pub(crate) fn spawn_routes_default(addr: Remote<ServerAddr>) -> watch::Receiver<Routes> {
let (tx, rx) = watch::channel(Routes::Endpoint(addr, Default::default()));
tokio::spawn(async move {
tx.closed().await;
});
rx
}
// === impl Outbound ===
impl<C> Outbound<C> {
@ -81,7 +73,7 @@ impl<C> Outbound<C> {
pub fn push_opaq_cached<T, I, R>(self, resolve: R) -> Outbound<svc::ArcNewCloneTcp<T, I>>
where
// Opaque target
T: svc::Param<Logical>,
T: svc::Param<watch::Receiver<Routes>>,
T: Clone + Send + Sync + 'static,
// Server-side connection
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,

View File

@ -1,12 +1,6 @@
use super::concrete;
use crate::{Outbound, ParentRef};
use linkerd_app_core::{
io, profiles,
proxy::{api_resolve::Metadata, tcp::balance},
svc,
transport::addrs::*,
Addr, Error, Infallible, NameAddr,
};
use linkerd_app_core::{io, profiles, proxy::tcp::balance, svc, Addr, Error, NameAddr};
use linkerd_distribute as distribute;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc, time};
@ -23,10 +17,6 @@ pub enum Routes {
/// Service profile routes.
Profile(ProfileRoutes),
/// Fallback endpoint forwarding.
// TODO(ver) Remove this variant when policy routes are fully wired up.
Endpoint(Remote<ServerAddr>, Metadata),
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -93,7 +83,7 @@ impl<N> Outbound<N> {
pub fn push_opaq_logical<T, I, NSvc>(self) -> Outbound<svc::ArcNewCloneTcp<T, I>>
where
// Opaque logical target.
T: svc::Param<Routes>,
T: svc::Param<watch::Receiver<Routes>>,
T: Eq + Hash + Clone + Debug + Send + Sync + 'static,
// Server-side socket.
I: io::AsyncRead + io::AsyncWrite + Debug + Send + Unpin + 'static,

View File

@ -5,6 +5,7 @@ use linkerd_app_core::{
errors::{self, FailFastError},
io::AsyncReadExt,
svc::{NewService, ServiceExt},
transport::addrs::*,
};
use std::net::SocketAddr;
use tokio::time;
@ -17,11 +18,11 @@ async fn forward() {
// We create a logical target to be resolved to endpoints.
let laddr = "xyz.example.com:4444".parse::<NameAddr>().unwrap();
let (_tx, rx) = tokio::sync::watch::channel(Profile {
let (_tx, rx) = tokio::sync::watch::channel(profiles::Profile {
addr: Some(profiles::LogicalAddr(laddr.clone())),
..Default::default()
});
let logical = Logical::Profile(laddr.clone(), rx.into());
let logical = Logical::Policy(laddr.clone(), rx.into());
// The resolution resolves a single endpoint.
let ep_addr = SocketAddr::new([192, 0, 2, 30].into(), 3333);
@ -31,7 +32,7 @@ async fn forward() {
// Build the TCP logical stack with a mocked connector.
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(svc::mk(move |ep: concrete::Endpoint<Concrete<Logical>>| {
.with_stack(svc::mk(move |ep: concrete::Endpoint<Concrete<Routes>>| {
let Remote(ServerAddr(ea)) = svc::Param::param(&ep);
assert_eq!(ea, ep_addr);
let mut io = support::io();
@ -68,11 +69,13 @@ async fn balances() {
// We create a logical target to be resolved to endpoints.
let laddr = "xyz.example.com:4444".parse::<NameAddr>().unwrap();
let (_tx, rx) = tokio::sync::watch::channel(Profile {
let (_tx, rx) = tokio::sync::watch::channel(profiles::Profile {
addr: Some(profiles::LogicalAddr(laddr.clone())),
..Default::default()
});
let logical = Logical::Profile(laddr.clone(), rx.into());
let logical = Routes::Policy(ProfileRoutes {
addr: laddr.clone(),
});
// The resolution resolves a single endpoint.
let ep0_addr = SocketAddr::new([192, 0, 2, 30].into(), 3333);
@ -86,7 +89,7 @@ async fn balances() {
let (rt, _shutdown) = runtime();
let svc = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(svc::mk(
move |ep: concrete::Endpoint<Concrete<Logical>>| match svc::Param::param(&ep) {
move |ep: concrete::Endpoint<Concrete<Routes>>| match svc::Param::param(&ep) {
Remote(ServerAddr(addr)) if addr == ep0_addr => {
tracing::debug!(%addr, "writing ep0");
let mut io = support::io();

View File

@ -133,18 +133,6 @@ impl svc::Param<Remote<ServerAddr>> for Sidecar {
}
}
impl svc::Param<Option<profiles::LogicalAddr>> for Sidecar {
fn param(&self) -> Option<profiles::LogicalAddr> {
self.profile.clone()?.logical_addr()
}
}
impl svc::Param<Option<profiles::Receiver>> for Sidecar {
fn param(&self) -> Option<profiles::Receiver> {
self.profile.clone()
}
}
impl svc::Param<Protocol> for Sidecar {
fn param(&self) -> Protocol {
if let Some(rx) = svc::Param::<Option<profiles::Receiver>>::param(self) {
@ -163,23 +151,6 @@ impl svc::Param<Protocol> for Sidecar {
}
}
impl svc::Param<opaq::Logical> for Sidecar {
fn param(&self) -> opaq::Logical {
if let Some(profile) = self.profile.clone() {
if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() {
return opaq::Logical::Profile(addr, profile);
}
if let Some((addr, metadata)) = profile.endpoint() {
return opaq::Logical::Forward(Remote(ServerAddr(addr)), metadata);
}
}
let OrigDstAddr(addr) = self.orig_dst;
opaq::Logical::Forward(Remote(ServerAddr(addr)), Default::default())
}
}
impl PartialEq for Sidecar {
fn eq(&self, other: &Self) -> bool {
self.orig_dst == other.orig_dst
@ -361,17 +332,18 @@ impl From<Sidecar> for OpaqSidecar {
let mut policy = parent.policy.clone();
if let Some(mut profile) = parent.profile.clone().map(watch::Receiver::from) {
if let Some(addr, meta) = profile.borrow().endpoint.is_empty() {}
// Only use service profiles if there are novel target
// overrides.
if !profile.borrow().targets.is_empty() {
tracing::debug!("Using ServiceProfile");
let init = Self::mk_profile_routes(addr.clone(), &profile.borrow_and_update());
let routes =
opaq::spawn_routes(profile, init, move |profile: &profiles::Profile| {
Some(Self::mk_profile_routes(addr.clone(), profile))
});
return OpaqSidecar { orig_dst, routes };
if let Some(addr) = profile.borrow().addr.clone() {
if !profile.borrow().targets.is_empty() {
tracing::debug!(?addr, "Using ServiceProfile");
let init = Self::mk_profile_routes(addr.clone(), &profile.borrow_and_update());
let routes =
opaq::spawn_routes(profile, init, move |profile: &profiles::Profile| {
Some(Self::mk_profile_routes(addr.clone(), profile))
});
return OpaqSidecar { orig_dst, routes };
}
}
}