From f2d5cf54f56b258b2582e0272851a470f734d81f Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 22 Oct 2024 04:17:19 +0000 Subject: [PATCH] wip --- linkerd/app/outbound/src/http.rs | 4 +- linkerd/app/outbound/src/opaq.rs | 14 ++---- linkerd/app/outbound/src/opaq/logical.rs | 14 +----- .../app/outbound/src/opaq/logical/tests.rs | 15 +++--- linkerd/app/outbound/src/sidecar.rs | 48 ++++--------------- 5 files changed, 26 insertions(+), 69 deletions(-) diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index c4bb17dc6..e5f18fa57 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -36,7 +36,7 @@ pub struct HttpMetrics { grpc_route: policy::GrpcRouteMetrics, } -pub(crate) fn spawn_routes( +pub fn spawn_routes( mut route_rx: watch::Receiver, init: Routes, mut mk: impl FnMut(&T) -> Option + Send + Sync + 'static, @@ -72,7 +72,7 @@ where rx } -pub(crate) fn spawn_routes_default(addr: Remote) -> watch::Receiver { +pub fn spawn_routes_default(addr: Remote) -> watch::Receiver { let (tx, rx) = watch::channel(Routes::Endpoint(addr, Default::default())); tokio::spawn(async move { tx.closed().await; diff --git a/linkerd/app/outbound/src/opaq.rs b/linkerd/app/outbound/src/opaq.rs index be5d0a080..e23c43ab6 100644 --- a/linkerd/app/outbound/src/opaq.rs +++ b/linkerd/app/outbound/src/opaq.rs @@ -17,10 +17,10 @@ use tokio::sync::watch; mod concrete; mod logical; -pub use self::logical::{Logical, PolicyRoutes, ProfileRoutes, Routes}; +pub use self::logical::{PolicyRoutes, ProfileRoutes, Routes}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct Opaq(Logical); +struct Opaq(watch::Receiver); #[derive(Clone, Debug, Default)] pub struct OpaqMetrics { @@ -63,14 +63,6 @@ where rx } -pub(crate) fn spawn_routes_default(addr: Remote) -> watch::Receiver { - let (tx, rx) = watch::channel(Routes::Endpoint(addr, Default::default())); - tokio::spawn(async move { - tx.closed().await; - }); - rx -} - // === impl Outbound === impl Outbound { @@ -81,7 +73,7 @@ impl Outbound { pub fn push_opaq_cached(self, resolve: R) -> Outbound> where // Opaque target - T: svc::Param, + T: svc::Param>, T: Clone + Send + Sync + 'static, // Server-side connection I: io::AsyncRead + io::AsyncWrite + io::PeerAddr, diff --git a/linkerd/app/outbound/src/opaq/logical.rs b/linkerd/app/outbound/src/opaq/logical.rs index 48c435887..b47ffcb1f 100644 --- a/linkerd/app/outbound/src/opaq/logical.rs +++ b/linkerd/app/outbound/src/opaq/logical.rs @@ -1,12 +1,6 @@ use super::concrete; use crate::{Outbound, ParentRef}; -use linkerd_app_core::{ - io, profiles, - proxy::{api_resolve::Metadata, tcp::balance}, - svc, - transport::addrs::*, - Addr, Error, Infallible, NameAddr, -}; +use linkerd_app_core::{io, profiles, proxy::tcp::balance, svc, Addr, Error, NameAddr}; use linkerd_distribute as distribute; use linkerd_proxy_client_policy as policy; use std::{fmt::Debug, hash::Hash, sync::Arc, time}; @@ -23,10 +17,6 @@ pub enum Routes { /// Service profile routes. Profile(ProfileRoutes), - - /// Fallback endpoint forwarding. - // TODO(ver) Remove this variant when policy routes are fully wired up. - Endpoint(Remote, Metadata), } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -93,7 +83,7 @@ impl Outbound { pub fn push_opaq_logical(self) -> Outbound> where // Opaque logical target. - T: svc::Param, + T: svc::Param>, T: Eq + Hash + Clone + Debug + Send + Sync + 'static, // Server-side socket. I: io::AsyncRead + io::AsyncWrite + Debug + Send + Unpin + 'static, diff --git a/linkerd/app/outbound/src/opaq/logical/tests.rs b/linkerd/app/outbound/src/opaq/logical/tests.rs index 3d4e061e7..b7f19ab88 100644 --- a/linkerd/app/outbound/src/opaq/logical/tests.rs +++ b/linkerd/app/outbound/src/opaq/logical/tests.rs @@ -5,6 +5,7 @@ use linkerd_app_core::{ errors::{self, FailFastError}, io::AsyncReadExt, svc::{NewService, ServiceExt}, + transport::addrs::*, }; use std::net::SocketAddr; use tokio::time; @@ -17,11 +18,11 @@ async fn forward() { // We create a logical target to be resolved to endpoints. let laddr = "xyz.example.com:4444".parse::().unwrap(); - let (_tx, rx) = tokio::sync::watch::channel(Profile { + let (_tx, rx) = tokio::sync::watch::channel(profiles::Profile { addr: Some(profiles::LogicalAddr(laddr.clone())), ..Default::default() }); - let logical = Logical::Profile(laddr.clone(), rx.into()); + let logical = Logical::Policy(laddr.clone(), rx.into()); // The resolution resolves a single endpoint. let ep_addr = SocketAddr::new([192, 0, 2, 30].into(), 3333); @@ -31,7 +32,7 @@ async fn forward() { // Build the TCP logical stack with a mocked connector. let (rt, _shutdown) = runtime(); let stack = Outbound::new(default_config(), rt, &mut Default::default()) - .with_stack(svc::mk(move |ep: concrete::Endpoint>| { + .with_stack(svc::mk(move |ep: concrete::Endpoint>| { let Remote(ServerAddr(ea)) = svc::Param::param(&ep); assert_eq!(ea, ep_addr); let mut io = support::io(); @@ -68,11 +69,13 @@ async fn balances() { // We create a logical target to be resolved to endpoints. let laddr = "xyz.example.com:4444".parse::().unwrap(); - let (_tx, rx) = tokio::sync::watch::channel(Profile { + let (_tx, rx) = tokio::sync::watch::channel(profiles::Profile { addr: Some(profiles::LogicalAddr(laddr.clone())), ..Default::default() }); - let logical = Logical::Profile(laddr.clone(), rx.into()); + let logical = Routes::Policy(ProfileRoutes { + addr: laddr.clone(), + }); // The resolution resolves a single endpoint. let ep0_addr = SocketAddr::new([192, 0, 2, 30].into(), 3333); @@ -86,7 +89,7 @@ async fn balances() { let (rt, _shutdown) = runtime(); let svc = Outbound::new(default_config(), rt, &mut Default::default()) .with_stack(svc::mk( - move |ep: concrete::Endpoint>| match svc::Param::param(&ep) { + move |ep: concrete::Endpoint>| match svc::Param::param(&ep) { Remote(ServerAddr(addr)) if addr == ep0_addr => { tracing::debug!(%addr, "writing ep0"); let mut io = support::io(); diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index f92929dee..704f6f724 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -133,18 +133,6 @@ impl svc::Param> for Sidecar { } } -impl svc::Param> for Sidecar { - fn param(&self) -> Option { - self.profile.clone()?.logical_addr() - } -} - -impl svc::Param> for Sidecar { - fn param(&self) -> Option { - self.profile.clone() - } -} - impl svc::Param for Sidecar { fn param(&self) -> Protocol { if let Some(rx) = svc::Param::>::param(self) { @@ -163,23 +151,6 @@ impl svc::Param for Sidecar { } } -impl svc::Param for Sidecar { - fn param(&self) -> opaq::Logical { - if let Some(profile) = self.profile.clone() { - if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() { - return opaq::Logical::Profile(addr, profile); - } - - if let Some((addr, metadata)) = profile.endpoint() { - return opaq::Logical::Forward(Remote(ServerAddr(addr)), metadata); - } - } - - let OrigDstAddr(addr) = self.orig_dst; - opaq::Logical::Forward(Remote(ServerAddr(addr)), Default::default()) - } -} - impl PartialEq for Sidecar { fn eq(&self, other: &Self) -> bool { self.orig_dst == other.orig_dst @@ -361,17 +332,18 @@ impl From for OpaqSidecar { let mut policy = parent.policy.clone(); if let Some(mut profile) = parent.profile.clone().map(watch::Receiver::from) { - if let Some(addr, meta) = profile.borrow().endpoint.is_empty() {} // Only use service profiles if there are novel target // overrides. - if !profile.borrow().targets.is_empty() { - tracing::debug!("Using ServiceProfile"); - let init = Self::mk_profile_routes(addr.clone(), &profile.borrow_and_update()); - let routes = - opaq::spawn_routes(profile, init, move |profile: &profiles::Profile| { - Some(Self::mk_profile_routes(addr.clone(), profile)) - }); - return OpaqSidecar { orig_dst, routes }; + if let Some(addr) = profile.borrow().addr.clone() { + if !profile.borrow().targets.is_empty() { + tracing::debug!(?addr, "Using ServiceProfile"); + let init = Self::mk_profile_routes(addr.clone(), &profile.borrow_and_update()); + let routes = + opaq::spawn_routes(profile, init, move |profile: &profiles::Profile| { + Some(Self::mk_profile_routes(addr.clone(), profile)) + }); + return OpaqSidecar { orig_dst, routes }; + } } }