diff --git a/Dockerfile b/Dockerfile index 236940473..bfb4ee0ab 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,9 +31,7 @@ FROM $RUST_IMAGE as build # When set, causes the proxy to be compiled in development mode. ARG PROXY_UNOPTIMIZED -# Controls what features are enabled in the proxy. This is typically empty but -# may be set to `mock-orig-dst` for profiling builds, or to `multithreaded` to -# enable the multithreaded Tokio runtime. +# Controls what features are enabled in the proxy. ARG PROXY_FEATURES RUN --mount=type=cache,target=/var/lib/apt/lists \ diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index c8871063a..d2ca68f37 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -13,7 +13,6 @@ This is used by tests and the executable. [features] allow-loopback = ["linkerd-app-outbound/allow-loopback"] -mock-orig-dst = ["linkerd-app-core/mock-orig-dst"] [dependencies] futures = "0.3.9" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index af2669719..bb0bd6b10 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -12,9 +12,6 @@ This crate conglomerates proxy configuration, runtime administration, etc, independently of the inbound and outbound proxy logic. """ -[features] -mock-orig-dst = ["linkerd-proxy-transport/mock-orig-dst"] - [dependencies] bytes = "1" http = "0.2" diff --git a/linkerd/app/core/src/config.rs b/linkerd/app/core/src/config.rs index 3df0e7524..b8f3f381c 100644 --- a/linkerd/app/core/src/config.rs +++ b/linkerd/app/core/src/config.rs @@ -1,11 +1,15 @@ pub use crate::exp_backoff::ExponentialBackoff; -pub use crate::proxy::http::{h1, h2}; -pub use crate::transport::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, Keepalive, NoOrigDstAddr}; +use crate::{ + proxy::http::{h1, h2}, + svc::Param, + transport::{Keepalive, ListenAddr}, +}; use std::time::Duration; #[derive(Clone, Debug)] -pub struct ServerConfig { - pub bind: BindTcp, +pub struct ServerConfig { + pub addr: ListenAddr, + pub keepalive: Keepalive, pub h2_settings: h2::Settings, } @@ -20,7 +24,7 @@ pub struct ConnectConfig { #[derive(Clone, Debug)] pub struct ProxyConfig { - pub server: ServerConfig, + pub server: ServerConfig, pub connect: ConnectConfig, pub buffer_capacity: usize, pub cache_max_idle_age: Duration, @@ -31,11 +35,14 @@ pub struct ProxyConfig { // === impl ServerConfig === -impl ServerConfig { - pub fn with_orig_dst_addr(self, orig_dst_addrs: B) -> ServerConfig { - ServerConfig { - bind: self.bind.with_orig_dst_addr(orig_dst_addrs), - h2_settings: self.h2_settings, - } +impl Param for ServerConfig { + fn param(&self) -> ListenAddr { + self.addr + } +} + +impl Param for ServerConfig { + fn param(&self) -> Keepalive { + self.keepalive } } diff --git a/linkerd/app/core/src/serve.rs b/linkerd/app/core/src/serve.rs index a76e9cce5..6eddcb7a0 100644 --- a/linkerd/app/core/src/serve.rs +++ b/linkerd/app/core/src/serve.rs @@ -1,26 +1,28 @@ -use crate::io; -use crate::svc; +use crate::{ + io, + svc::{self, Param}, + transport::{ClientAddr, Remote}, +}; use futures::prelude::*; use linkerd_error::Error; -use linkerd_proxy_transport::listen::Addrs; use tower::util::ServiceExt; -use tracing::instrument::Instrument; -use tracing::{debug, debug_span, info, warn}; +use tracing::{debug, debug_span, info, instrument::Instrument, warn}; /// Spawns a task that binds an `L`-typed listener with an `A`-typed /// connection-accepting service. /// /// The task is driven until shutdown is signaled. -pub async fn serve( - listen: impl Stream>, +pub async fn serve( + listen: impl Stream>, mut new_accept: M, shutdown: impl Future, ) where I: Send + 'static, - M: svc::NewService, - A: tower::Service, Response = ()> + Send + 'static, - A::Error: Into, - A::Future: Send + 'static, + A: Param>, + M: svc::NewService, + S: tower::Service, Response = ()> + Send + 'static, + S::Error: Into, + S::Future: Send + 'static, { let accept = async move { futures::pin_mut!(listen); @@ -38,7 +40,7 @@ pub async fn serve( }; // The local addr should be instrumented from the listener's context. - let span = debug_span!("accept", client.addr = %addrs.client()); + let span = debug_span!("accept", client.addr = %addrs.param()); let accept = span.in_scope(|| new_accept.new_service(addrs)); diff --git a/linkerd/app/inbound/src/direct.rs b/linkerd/app/inbound/src/direct.rs index f597e799f..90c72171e 100644 --- a/linkerd/app/inbound/src/direct.rs +++ b/linkerd/app/inbound/src/direct.rs @@ -71,7 +71,7 @@ impl Inbound { > + Clone, > where - T: Param> + Param> + Clone + Send + 'static, + T: Param> + Param + Clone + Send + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Send + Sync + Unpin + 'static, N: svc::NewService + Clone + Send + Sync + Unpin + 'static, @@ -173,7 +173,7 @@ impl Inbound { impl TryFrom<(tls::ConditionalServerTls, T)> for ClientInfo where - T: Param>, + T: Param, T: Param>, { type Error = Error; @@ -184,14 +184,7 @@ where client_id: Some(client_id), negotiated_protocol, }) => { - let local: Option = addrs.param(); - let OrigDstAddr(local_addr) = local.ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - ) - })?; + let OrigDstAddr(local_addr) = addrs.param(); Ok(Self { client_id, alpn: negotiated_protocol, diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 5b4a269af..bf08e46af 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -71,7 +71,7 @@ async fn unmeshed_http1_hello_world() { profile_tx.send(profile::Profile::default()).unwrap(); // Build the outbound server - let cfg = default_config(accept.tcp.target_addr); + let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(accept); let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; @@ -118,7 +118,7 @@ async fn downgrade_origin_form() { profile_tx.send(profile::Profile::default()).unwrap(); // Build the outbound server - let cfg = default_config(accept.tcp.target_addr); + let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(accept); let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; @@ -166,7 +166,7 @@ async fn downgrade_absolute_form() { profile_tx.send(profile::Profile::default()).unwrap(); // Build the outbound server - let cfg = default_config(accept.tcp.target_addr); + let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(accept); let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 11df30f63..a0f087e72 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -21,16 +21,14 @@ use self::{ target::{HttpAccept, TcpAccept}, }; use linkerd_app_core::{ - config::{ConnectConfig, ProxyConfig}, + config::{ConnectConfig, ProxyConfig, ServerConfig}, detect, drain, io, metrics, profiles, proxy::tcp, - serve, - svc::{self, Param}, - tls, - transport::{self, ClientAddr, OrigDstAddr, Remote, ServerAddr}, + serve, svc, tls, + transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, Error, NameMatch, ProxyRuntime, }; -use std::{convert::TryFrom, fmt::Debug, future::Future, net::SocketAddr, time::Duration}; +use std::{convert::TryFrom, fmt::Debug, future::Future, time::Duration}; use tracing::{debug_span, info_span}; #[derive(Clone, Debug)] @@ -52,7 +50,7 @@ pub struct Inbound { stack: svc::Stack, } -// === impl Config === +// === impl Inbound === impl Inbound { pub fn config(&self) -> &Config { @@ -89,7 +87,7 @@ impl Inbound<()> { } } - pub fn to_tcp_connect>( + pub fn to_tcp_connect>( &self, ) -> Inbound< impl svc::Service< @@ -122,35 +120,34 @@ impl Inbound<()> { } } - pub fn serve( + pub fn serve( self, + bind: B, profiles: P, gateway: G, - ) -> (SocketAddr, impl Future + Send) + ) -> (Local, impl Future + Send) where + B: Bind, + B::Addrs: svc::Param> + + svc::Param> + + svc::Param, G: svc::NewService, G: Clone + Send + Sync + Unpin + 'static, - GSvc: svc::Service>, Response = ()> - + Send - + 'static, + GSvc: svc::Service>, Response = ()> + Send + 'static, GSvc::Error: Into, GSvc::Future: Send, P: profiles::GetProfile + Clone + Send + Sync + 'static, P::Error: Send, P::Future: Send, { - let (listen_addr, listen) = self - .config - .proxy - .server - .bind - .bind() + let (listen_addr, listen) = bind + .bind(&self.config.proxy.server) .expect("Failed to bind inbound listener"); let serve = async move { - let stack = self - .to_tcp_connect() - .into_server(listen_addr.port(), profiles, gateway); + let stack = + self.to_tcp_connect() + .into_server(listen_addr.as_ref().port(), profiles, gateway); let shutdown = self.runtime.drain.signaled(); serve::serve(listen, stack, shutdown).await }; @@ -218,7 +215,7 @@ where Service = impl svc::Service, > + Clone where - T: Param> + Param> + Clone + Send + 'static, + T: svc::Param> + svc::Param + Clone + Send + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Send + Sync + Unpin + 'static, G: svc::NewService, @@ -267,7 +264,7 @@ where .stack .push_map_target(TcpEndpoint::from) .push(self.runtime.metrics.transport.layer_accept()) - .push_request_filter(TcpAccept::port_skipped) + .push_map_target(TcpAccept::port_skipped) .check_new_service::() .instrument(|_: &T| debug_span!("forward")) .into_inner(), @@ -282,13 +279,9 @@ where .into_inner(), ) .instrument(|a: &T| { - if let Some(OrigDstAddr(target_addr)) = a.param() { - info_span!("server", port = target_addr.port()) - } else { - info_span!("server", port = %"") - } + let OrigDstAddr(target_addr) = a.param(); + info_span!("server", port = target_addr.port()) }) - .check_new_service::() .into_inner() } } @@ -303,18 +296,12 @@ impl From> for SkipByPort { impl svc::Predicate for SkipByPort where - T: Param>, + T: svc::Param, { type Request = svc::Either; fn check(&mut self, t: T) -> Result { - let OrigDstAddr(addr) = t.param().ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - ) - })?; + let OrigDstAddr(addr) = t.param(); if !self.0.contains(&addr.port()) { Ok(svc::Either::A(t)) } else { diff --git a/linkerd/app/inbound/src/prevent_loop.rs b/linkerd/app/inbound/src/prevent_loop.rs index 85c7e1c8a..ce8108441 100644 --- a/linkerd/app/inbound/src/prevent_loop.rs +++ b/linkerd/app/inbound/src/prevent_loop.rs @@ -44,17 +44,11 @@ impl PreventLoop { } } -impl>> Predicate for SwitchLoop { +impl> Predicate for SwitchLoop { type Request = Either; fn check(&mut self, addrs: T) -> Result, Error> { - let OrigDstAddr(addr) = addrs.param().ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - ) - })?; + let OrigDstAddr(addr) = addrs.param(); tracing::debug!(%addr, self.port); if addr.port() != self.port { Ok(Either::A(addrs)) diff --git a/linkerd/app/inbound/src/target.rs b/linkerd/app/inbound/src/target.rs index 0553270dd..b39021b02 100644 --- a/linkerd/app/inbound/src/target.rs +++ b/linkerd/app/inbound/src/target.rs @@ -10,13 +10,7 @@ use linkerd_app_core::{ transport_header::TransportHeader, Addr, Conditional, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, }; -use std::{ - convert::{TryFrom, TryInto}, - io, - net::SocketAddr, - str::FromStr, - sync::Arc, -}; +use std::{convert::TryInto, net::SocketAddr, str::FromStr, sync::Arc}; use tracing::debug; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -66,30 +60,25 @@ pub struct RequestTarget { // === impl TcpAccept === impl TcpAccept { - pub fn port_skipped(tcp: T) -> Result + pub fn port_skipped(tcp: T) -> Self where - T: Param> + Param>, + T: Param> + Param, { - let orig_dst: Option = tcp.param(); - let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found") - })?; - Ok(Self { + let OrigDstAddr(target_addr) = tcp.param(); + Self { target_addr, client_addr: tcp.param(), tls: Conditional::None(tls::NoServerTls::PortSkipped), - }) + } } +} - /// Returns a `TcpAccept` for the provided TLS metadata and addresses, - /// determining the target address from the server's local listener address - /// rather than a `SO_ORIGINAL_DST` address. - pub fn from_local_addr((tls, addrs): tls::server::Meta) -> Self - where - T: Param> + Param>, - { - let Local(ServerAddr(target_addr)) = addrs.param(); +impl From> for TcpAccept +where + T: Param> + Param, +{ + fn from((tls, addrs): tls::server::Meta) -> Self { + let OrigDstAddr(target_addr) = addrs.param(); Self { target_addr, client_addr: addrs.param(), @@ -98,25 +87,6 @@ impl TcpAccept { } } -impl TryFrom> for TcpAccept -where - T: Param> + Param>, -{ - type Error = io::Error; - fn try_from((tls, addrs): tls::server::Meta) -> Result { - let orig_dst: Option = addrs.param(); - let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found") - })?; - Ok(Self { - target_addr, - client_addr: addrs.param(), - tls, - }) - } -} - impl Param for TcpAccept { fn param(&self) -> SocketAddr { self.target_addr diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index 1d5bce408..f6c766fc8 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -8,15 +8,13 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{Keepalive, ListenAddr}, NameMatch, ProxyRuntime, }; pub use linkerd_app_test as support; -use std::{net::SocketAddr, time::Duration}; +use std::time::Duration; -const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; - -pub fn default_config(orig_dst: SocketAddr) -> Config { +pub fn default_config() -> Config { let cluster_local = "svc.cluster.local." .parse::() .expect("`svc.cluster.local.` suffix is definitely valid"); @@ -24,11 +22,8 @@ pub fn default_config(orig_dst: SocketAddr) -> Config { allow_discovery: NameMatch::new(Some(cluster_local)), proxy: config::ProxyConfig { server: config::ServerConfig { - bind: BindTcp::new( - ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)), - Keepalive(None), - ) - .with_orig_dst_addr(orig_dst.into()), + addr: ListenAddr(([0, 0, 0, 0], 0).into()), + keepalive: Keepalive(None), h2_settings: h2::Settings::default(), }, connect: config::ConnectConfig { diff --git a/linkerd/app/integration/Cargo.toml b/linkerd/app/integration/Cargo.toml index ed3abbfc2..417cf7abe 100644 --- a/linkerd/app/integration/Cargo.toml +++ b/linkerd/app/integration/Cargo.toml @@ -24,8 +24,8 @@ http = "0.2" http-body = "0.4" hyper = { version = "0.14.2", features = ["http1", "http2", "stream", "client", "server"] } linkerd-channel = { path = "../../channel" } -linkerd-app = { path = "..", features = ["allow-loopback", "mock-orig-dst"] } -linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] } +linkerd-app = { path = "..", features = ["allow-loopback"] } +linkerd-app-core = { path = "../core" } linkerd-metrics = { path = "../../metrics", features = ["test_util"] } linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.18", features = ["arbitrary"] } linkerd-app-test = { path = "../test" } diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index 14910b2c1..78f7ecf3f 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -1,5 +1,11 @@ use super::*; -use std::{future::Future, pin::Pin, task::Poll, thread}; +use app_core::transport::OrigDstAddr; +use linkerd_app_core::{ + svc::Param, + transport::{listen, orig_dst, Keepalive, ListenAddr}, +}; +use std::{future::Future, net::SocketAddr, pin::Pin, task::Poll, thread}; +use tokio::net::TcpStream; use tracing::instrument::Instrument; pub fn new() -> Proxy { @@ -26,6 +32,9 @@ pub struct Proxy { shutdown_signal: Option + Send>>>, } +#[derive(Copy, Clone, Debug)] +struct MockOrigDst(Option); + pub struct Listening { pub tap: Option, pub inbound: SocketAddr, @@ -44,6 +53,35 @@ pub struct Listening { thread: thread::JoinHandle<()>, } +// === impl MockOrigDst === + +impl listen::Bind for MockOrigDst +where + T: Param + Param, +{ + type Addrs = orig_dst::Addrs; + type Io = tokio::net::TcpStream; + type Incoming = Pin< + Box> + Send + Sync + 'static>, + >; + + fn bind(self, params: &T) -> io::Result> { + let (bound, incoming) = listen::BindTcp::default().bind(params)?; + let addr = self.0; + let incoming = Box::pin(incoming.map(move |res| { + let (inner, tcp) = res?; + let orig_dst = addr + .map(OrigDstAddr) + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "No mocked SO_ORIG_DST"))?; + let addrs = orig_dst::Addrs { inner, orig_dst }; + Ok((addrs, tcp)) + })); + Ok((bound, incoming)) + } +} + +// === impl Proxy === + impl Proxy { /// Pass a customized support `Controller` for this proxy to use. /// @@ -200,16 +238,6 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { env.put(app::env::ENV_OUTBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned()); } - // If a given server is missing, use the admin server as a substitute. - env.put( - app::env::ENV_INBOUND_ORIG_DST_ADDR, - inbound.unwrap_or(controller.addr).to_string(), - ); - env.put( - app::env::ENV_OUTBOUND_ORIG_DST_ADDR, - outbound.unwrap_or(controller.addr).to_string(), - ); - if random_ports { env.put(app::env::ENV_INBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned()); env.put(app::env::ENV_CONTROL_LISTEN_ADDR, "127.0.0.1:0".to_owned()); @@ -264,6 +292,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { } let config = app::env::parse_config(&env).unwrap(); + let dispatch = tracing::Dispatch::default(); let (trace, trace_handle) = if dispatch .downcast_ref::() @@ -301,9 +330,12 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { .build() .expect("proxy") .block_on(async move { + let bind_in = MockOrigDst(inbound); + let bind_out = MockOrigDst(outbound); + let bind_adm = listen::BindTcp::default(); let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); let main = config - .build(shutdown_tx, trace_handle) + .build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle) .await .expect("config"); @@ -358,14 +390,13 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { outbound.addr = ?outbound_addr, outbound.orig_dst = ?outbound.as_ref(), metrics.addr = ?metrics_addr, - "proxy running", ); Listening { - tap: tap_addr, - inbound: inbound_addr, - outbound: outbound_addr, - metrics: metrics_addr, + tap: tap_addr.map(Into::into), + inbound: inbound_addr.into(), + outbound: outbound_addr.into(), + metrics: metrics_addr.into(), outbound_server: proxy.outbound_server, inbound_server: proxy.inbound_server, diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index a69a5fec2..7ce444205 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -22,7 +22,7 @@ impl Outbound { >, > where - T: Param>, + T: Param, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static, N: svc::NewService + Clone + Send + 'static, NSvc: svc::Service, Response = (), Error = Error> + Send + 'static, diff --git a/linkerd/app/outbound/src/http/tests.rs b/linkerd/app/outbound/src/http/tests.rs index ef47aec55..a77d1b424 100644 --- a/linkerd/app/outbound/src/http/tests.rs +++ b/linkerd/app/outbound/src/http/tests.rs @@ -13,7 +13,7 @@ use linkerd_app_core::{ io, svc::{self, NewService}, tls, - transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::orig_dst, Error, NameAddr, ProxyRuntime, }; use std::{ @@ -34,7 +34,7 @@ fn build_server( resolver: resolver::Dst, connect: Connect, ) -> impl svc::NewService< - listen::Addrs, + orig_dst::Addrs, Service = impl tower::Service< I, Response = (), @@ -97,6 +97,7 @@ impl svc::Service for NoTcpBalancer { type Response = (); type Error = Error; type Future = Pin> + Send + 'static>>; + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { unreachable!("no TCP load balancer should be created in this test!"); } @@ -114,7 +115,7 @@ async fn profile_endpoint_propagates_conn_errors() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let meta = support::resolver::Metadata::new( @@ -217,7 +218,7 @@ async fn meshed_hello_world() { let _trace = support::trace_init(); let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); @@ -267,7 +268,7 @@ async fn stacks_idle_out() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let idle_timeout = Duration::from_millis(500); - let mut cfg = default_config(ep1); + let mut cfg = default_config(); cfg.proxy.cache_max_idle_age = idle_timeout; let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -332,7 +333,7 @@ async fn active_stacks_dont_idle_out() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let idle_timeout = Duration::from_millis(500); - let mut cfg = default_config(ep1); + let mut cfg = default_config(); cfg.proxy.cache_max_idle_age = idle_timeout; let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -422,14 +423,6 @@ async fn active_stacks_dont_idle_out() { proxy_bg.await.unwrap(); } -pub fn addrs(od: SocketAddr) -> listen::Addrs { - listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(od)), - ) -} - async fn unmeshed_hello_world( server_settings: hyper::server::conn::Http, mut client_settings: ClientBuilder, @@ -437,7 +430,7 @@ async fn unmeshed_hello_world( let _trace = support::trace_init(); let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); // Build a mock "connector" that returns the upstream "server" IO. let connect = support::connect().endpoint_fn_boxed(ep1, hello_server(server_settings)); diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 73ca34791..7fc0424c0 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -5,10 +5,9 @@ use linkerd_app_core::{ io, profiles, svc::{self, stack::Param}, tls, - transport::{self, OrigDstAddr}, + transport::{self, ClientAddr, OrigDstAddr, Remote}, Addr, AddrMatch, Error, }; -use std::convert::TryFrom; use tracing::{debug_span, info_span}; impl Outbound<()> { @@ -28,7 +27,7 @@ impl Outbound<()> { Service = impl svc::Service, > where - T: Param>, + T: Param + Param> + Clone + Send + Sync + 'static, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static, N: svc::NewService + Clone + Send + Sync + 'static, NSvc: svc::Service>, Response = ()> @@ -136,7 +135,10 @@ impl Outbound<()> { )) .push(self.runtime.metrics.transport.layer_accept()) .instrument(|a: &tcp::Accept| info_span!("ingress", orig_dst = %a.orig_dst)) - .push_request_filter(|a: T| tcp::Accept::try_from(a.param())) + .push_map_target(|a: T| { + let orig_dst = Param::::param(&a); + tcp::Accept::from(orig_dst) + }) // Boxing is necessary purely to limit the link-time overhead of // having enormous types. .push(svc::BoxNewService::layer()) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index a419c6e5b..ec4bab0bf 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -15,7 +15,7 @@ pub mod tcp; pub(crate) mod test_util; use linkerd_app_core::{ - config::ProxyConfig, + config::{ProxyConfig, ServerConfig}, io, metrics, profiles, proxy::{ api_resolve::{ConcreteAddr, Metadata}, @@ -24,10 +24,10 @@ use linkerd_app_core::{ serve, svc::{self, stack::Param}, tls, - transport::OrigDstAddr, + transport::{addrs::*, listen::Bind}, AddrMatch, Error, ProxyRuntime, }; -use std::{collections::HashMap, future::Future, net::SocketAddr, time::Duration}; +use std::{collections::HashMap, future::Future, time::Duration}; use tracing::info; const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30); @@ -103,7 +103,8 @@ impl Outbound { Service = impl svc::Service, > where - T: Param>, + Self: Clone + 'static, + T: Param + Param> + Clone + Send + Sync + 'static, S: Clone + Send + Sync + Unpin + 'static, S: svc::Service, S::Response: tls::HasNegotiatedProtocol, @@ -135,8 +136,15 @@ impl Outbound { } impl Outbound<()> { - pub fn serve(self, profiles: P, resolve: R) -> (SocketAddr, impl Future) + pub fn serve( + self, + bind: B, + profiles: P, + resolve: R, + ) -> (Local, impl Future) where + B: Bind, + B::Addrs: Param> + Param, R: Clone + Send + Sync + Unpin + 'static, R: Resolve, R::Resolution: Send, @@ -145,12 +153,8 @@ impl Outbound<()> { P::Future: Send, P::Error: Send, { - let (listen_addr, listen) = self - .config - .proxy - .server - .bind - .bind() + let (listen_addr, listen) = bind + .bind(&self.config.proxy.server) .expect("Failed to bind outbound listener"); let serve = async move { diff --git a/linkerd/app/outbound/src/tcp/mod.rs b/linkerd/app/outbound/src/tcp/mod.rs index 684b093ee..19da0d33e 100644 --- a/linkerd/app/outbound/src/tcp/mod.rs +++ b/linkerd/app/outbound/src/tcp/mod.rs @@ -23,23 +23,6 @@ impl From for Accept { } } -impl std::convert::TryFrom> for Accept { - type Error = std::io::Error; - - fn try_from(orig_dst: Option) -> Result { - match orig_dst { - Some(addr) => Ok(Self::from(addr)), - None => { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - )) - } - } - } -} - impl From<(P, Accept)> for target::Accept { fn from((protocol, Accept { orig_dst, .. }): (P, Accept)) -> Self { Self { orig_dst, protocol } diff --git a/linkerd/app/outbound/src/tcp/tests.rs b/linkerd/app/outbound/src/tcp/tests.rs index 892de2d11..1b7e992ec 100644 --- a/linkerd/app/outbound/src/tcp/tests.rs +++ b/linkerd/app/outbound/src/tcp/tests.rs @@ -13,7 +13,7 @@ use linkerd_app_core::{ io, svc, svc::NewService, tls, - transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{addrs::*, listen, orig_dst}, Addr, Conditional, Error, IpMatch, NameAddr, }; use std::{ @@ -62,7 +62,7 @@ async fn plaintext_tcp() { support::resolver().endpoint_exists(target_addr, target_addr, Default::default()); // Build the outbound TCP balancer stack. - let cfg = default_config(target_addr); + let cfg = default_config(); let (rt, _) = runtime(); Outbound::new(cfg, rt) .with_stack(connect) @@ -141,7 +141,7 @@ async fn tls_when_hinted() { // Build the outbound TCP balancer stack. let (rt, _) = runtime(); - let mut stack = Outbound::new(default_config(([0, 0, 0, 0], 0).into()), rt) + let mut stack = Outbound::new(default_config(), rt) .with_stack(connect) .push_tcp_logical(resolver) .into_inner(); @@ -164,7 +164,7 @@ async fn resolutions_are_reused() { let _trace = support::trace_init(); let addr = SocketAddr::new([0, 0, 0, 0].into(), 5550); - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); // Build a mock "connector" that returns the upstream "server" IO. @@ -242,7 +242,7 @@ async fn load_balances() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -336,7 +336,7 @@ async fn load_balancer_add_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -449,7 +449,7 @@ async fn load_balancer_remove_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -542,7 +542,7 @@ async fn no_profiles_when_outside_search_nets() { let no_profile_addr = SocketAddr::new([126, 32, 5, 18].into(), 5550); let cfg = Config { allow_discovery: IpMatch::new(Some(IpNet::from_str("10.0.0.0/8").unwrap())).into(), - ..default_config(profile_addr) + ..default_config() }; let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -616,7 +616,7 @@ async fn no_discovery_when_profile_has_an_endpoint() { let _trace = support::trace_init(); let ep = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep); + let cfg = default_config(); let meta = support::resolver::Metadata::new( Default::default(), support::resolver::ProtocolHint::Unknown, @@ -661,7 +661,7 @@ async fn profile_endpoint_propagates_conn_errors() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let ep2 = SocketAddr::new([10, 0, 0, 42].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let meta = support::resolver::Metadata::new( @@ -697,12 +697,7 @@ async fn profile_endpoint_propagates_conn_errors() { // Build the outbound server let mut server = build_server(cfg, profiles, resolver, connect); - let svc = server.new_service(listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(ep1)), - )); - + let svc = server.new_service(addrs(ep1)); let res = svc .oneshot( support::io() @@ -765,7 +760,7 @@ fn build_server( resolver: resolver::Dst, connect: Connect, ) -> impl svc::NewService< - listen::Addrs, + orig_dst::Addrs, Service = impl tower::Service< I, Response = (), @@ -792,7 +787,7 @@ fn hello_world_client( new_svc: &mut N, ) -> impl Future + Send where - N: svc::NewService + Send + 'static, + N: svc::NewService + Send + 'static, S: svc::Service + Send + 'static, S::Error: Into, S::Future: Send + 'static, @@ -800,11 +795,13 @@ where let span = tracing::info_span!("hello_world_client", %orig_dst); let svc = { let _e = span.enter(); - let addrs = listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(orig_dst)), - ); + let addrs = orig_dst::Addrs { + orig_dst: OrigDstAddr(orig_dst), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + }; let svc = new_svc.new_service(addrs); tracing::trace!("new service"); svc diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index 5b36533e6..7c38becef 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -7,25 +7,20 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{self, orig_dst, Keepalive, ListenAddr}, IpMatch, ProxyRuntime, }; pub use linkerd_app_test as support; use std::{net::SocketAddr, str::FromStr, time::Duration}; -const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; - -pub fn default_config(orig_dst: SocketAddr) -> Config { +pub fn default_config() -> Config { Config { ingress_mode: false, allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(), proxy: config::ProxyConfig { server: config::ServerConfig { - bind: BindTcp::new( - ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)), - Keepalive(None), - ) - .with_orig_dst_addr(orig_dst.into()), + addr: ListenAddr(([0, 0, 0, 0], 0).into()), + keepalive: Keepalive(None), h2_settings: h2::Settings::default(), }, connect: config::ConnectConfig { @@ -65,3 +60,14 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) { }; (runtime, drain_tx) } + +pub fn addrs(od: SocketAddr) -> orig_dst::Addrs { + use transport::{addrs::*, listen}; + orig_dst::Addrs { + orig_dst: OrigDstAddr(od), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + } +} diff --git a/linkerd/app/src/admin.rs b/linkerd/app/src/admin.rs index 91d362d9a..7f9c88f04 100644 --- a/linkerd/app/src/admin.rs +++ b/linkerd/app/src/admin.rs @@ -3,17 +3,18 @@ use crate::core::{ config::ServerConfig, detect, drain, errors, metrics::{self, FmtMetrics}, - serve, tls, trace, - transport::listen, + serve, + svc::{self, Param}, + tls, trace, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, Error, }; use crate::{ http, identity::LocalCrtKey, inbound::target::{HttpAccept, Target, TcpAccept}, - svc, }; -use std::{net::SocketAddr, pin::Pin, time::Duration}; +use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tracing::debug; @@ -24,7 +25,7 @@ pub struct Config { } pub struct Admin { - pub listen_addr: SocketAddr, + pub listen_addr: Local, pub latch: admin::Latch, pub serve: Pin + Send + 'static>>, } @@ -32,8 +33,10 @@ pub struct Admin { // === impl Config === impl Config { - pub fn build( + #[allow(clippy::clippy::too_many_arguments)] + pub fn build( self, + bind: B, identity: Option, report: R, metrics: metrics::Proxy, @@ -43,10 +46,12 @@ impl Config { ) -> Result where R: FmtMetrics + Clone + Send + 'static + Unpin, + B: Bind, + B::Addrs: svc::Param> + svc::Param>, { const DETECT_TIMEOUT: Duration = Duration::from_secs(1); - let (listen_addr, listen) = self.server.bind.bind()?; + let (listen_addr, listen) = bind.bind(&self.server)?; let (ready, latch) = admin::Readiness::new(); let admin = admin::Admin::new(report, ready, shutdown, trace); @@ -76,8 +81,15 @@ impl Config { http::DetectHttp::default(), )) .push(metrics.transport.layer_accept()) - .push_map_target(TcpAccept::from_local_addr) - .check_new_clone::>() + .push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + // TODO this should use an admin-specific target type. + let Local(ServerAddr(target_addr)) = addrs.param(); + TcpAccept { + tls, + client_addr: addrs.param(), + target_addr, + } + }) .push(tls::NewDetectTls::layer(identity, DETECT_TIMEOUT)) .into_inner(); diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 24e2464fd..5d7ffb21b 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -4,7 +4,7 @@ use crate::core::{ control::{Config as ControlConfig, ControlAddr}, proxy::http::{h1, h2}, tls, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{Keepalive, ListenAddr}, Addr, AddrMatch, Conditional, NameMatch, }; use crate::{dns, gateway, identity, inbound, oc_collector, outbound}; @@ -52,15 +52,6 @@ pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"; pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"; pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"; -// When compiled with the `mock-orig-dst` flag, these environment variables are required to -// configure the proxy's behavior. - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_INBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_INBOUND_ORIG_DST_ADDR"; - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_OUTBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_ORIG_DST_ADDR"; - pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE"; const ENV_INGRESS_MODE: &str = "LINKERD2_PROXY_INGRESS_MODE"; @@ -262,12 +253,6 @@ pub fn parse_config(strings: &S) -> Result let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration); let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration); - #[cfg(feature = "mock-orig-dst")] - let (inbound_mock_orig_dst, outbound_mock_orig_dst) = ( - parse(strings, ENV_INBOUND_ORIG_DST_ADDR, parse_socket_addr), - parse(strings, ENV_OUTBOUND_ORIG_DST_ADDR, parse_socket_addr), - ); - let inbound_disable_ports = parse( strings, ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, @@ -360,26 +345,6 @@ pub fn parse_config(strings: &S) -> Result let buffer_capacity = buffer_capacity?.unwrap_or(DEFAULT_BUFFER_CAPACITY); - #[cfg(feature = "mock-orig-dst")] - let (inbound_orig_dst, outbound_orig_dst) = ( - inbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_INBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - outbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_OUTBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - ); - - #[cfg(not(feature = "mock-orig-dst"))] - let (inbound_orig_dst, outbound_orig_dst): (DefaultOrigDstAddr, DefaultOrigDstAddr) = - Default::default(); - let dst_profile_suffixes = dst_profile_suffixes? .unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()); let dst_profile_networks = dst_profile_networks?.unwrap_or_default(); @@ -387,16 +352,14 @@ pub fn parse_config(strings: &S) -> Result let outbound = { let ingress_mode = parse(strings, ENV_INGRESS_MODE, parse_bool)?.unwrap_or(false); - let keepalive = Keepalive(outbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - outbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + outbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(outbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(outbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -451,16 +414,14 @@ pub fn parse_config(strings: &S) -> Result }; let inbound = { - let keepalive = Keepalive(inbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - inbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + inbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(inbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(inbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -507,7 +468,7 @@ pub fn parse_config(strings: &S) -> Result // Ensure that connections thaat directly target the inbound port are // secured (unless identity is disabled). - let inbound_port = server.bind.addr().as_ref().port(); + let inbound_port = server.addr.as_ref().port(); if !id_disabled && !require_identity_for_inbound_ports.contains(&inbound_port) { debug!( "Adding {} to {}", @@ -566,13 +527,11 @@ pub fn parse_config(strings: &S) -> Result let admin = super::admin::Config { metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), server: ServerConfig { - bind: BindTcp::new( - ListenAddr( - admin_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), - ), - inbound.proxy.server.bind.keepalive(), + addr: ListenAddr( + admin_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), ), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }; @@ -617,7 +576,8 @@ pub fn parse_config(strings: &S) -> Result .map(|(addr, ids)| super::tap::Config::Enabled { permitted_client_ids: ids, config: ServerConfig { - bind: BindTcp::new(ListenAddr(addr), inbound.proxy.server.bind.keepalive()), + addr: ListenAddr(addr), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }) diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 829a3c9b3..e9f21f138 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -12,12 +12,20 @@ pub mod tap; pub use self::metrics::Metrics; use futures::{future, FutureExt, TryFutureExt}; pub use linkerd_app_core::{self as core, metrics, trace}; -use linkerd_app_core::{control::ControlAddr, dns, drain, proxy::http, svc, Error, ProxyRuntime}; +use linkerd_app_core::{ + config::ServerConfig, + control::ControlAddr, + dns, drain, + proxy::http, + svc::Param, + transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + Error, ProxyRuntime, +}; use linkerd_app_gateway as gateway; use linkerd_app_inbound::{self as inbound, Inbound}; use linkerd_app_outbound::{self as outbound, Outbound}; use linkerd_channel::into_stream::IntoStream; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tokio::{sync::mpsc, time::Duration}; use tracing::instrument::Instrument; use tracing::{debug, info, info_span}; @@ -53,9 +61,9 @@ pub struct App { drain: drain::Signal, dst: ControlAddr, identity: identity::Identity, - inbound_addr: SocketAddr, + inbound_addr: Local, oc_collector: oc_collector::OcCollector, - outbound_addr: SocketAddr, + outbound_addr: Local, start_proxy: Pin + Send + 'static>>, tap: tap::Tap, } @@ -64,16 +72,29 @@ impl Config { pub fn try_from_env() -> Result { env::Env.try_config() } +} +impl Config { /// Build an application. /// /// It is currently required that this be run on a Tokio runtime, since some /// services are created eagerly and must spawn tasks to do so. - pub async fn build( + pub async fn build( self, + bind_in: BIn, + bind_out: BOut, + bind_admin: BAdmin, shutdown_tx: mpsc::UnboundedSender<()>, log_level: trace::Handle, - ) -> Result { + ) -> Result + where + BIn: Bind + 'static, + BIn::Addrs: Param> + Param> + Param, + BOut: Bind + 'static, + BOut::Addrs: Param> + Param> + Param, + BAdmin: Bind + Clone + 'static, + BAdmin::Addrs: Param> + Param>, + { use metrics::FmtMetrics; let Config { @@ -98,7 +119,11 @@ impl Config { let (drain_tx, drain_rx) = drain::channel(); - let tap = info_span!("tap").in_scope(|| tap.build(identity.local(), drain_rx.clone()))?; + let tap = { + let bind = bind_admin.clone(); + info_span!("tap").in_scope(|| tap.build(bind, identity.local(), drain_rx.clone()))? + }; + let dst = { let metrics = metrics.control.clone(); let dns = dns.resolver.clone(); @@ -119,7 +144,15 @@ impl Config { let drain = drain_rx.clone(); let metrics = metrics.inbound.clone(); info_span!("admin").in_scope(move || { - admin.build(identity, report, metrics, log_level, drain, shutdown_tx) + admin.build( + bind_admin, + identity, + report, + metrics, + log_level, + drain, + shutdown_tx, + ) })? }; @@ -155,8 +188,9 @@ impl Config { dst.resolve.clone(), ); - let (inbound_addr, inbound_serve) = inbound.serve(dst.profiles.clone(), gateway_stack); - let (outbound_addr, outbound_serve) = outbound.serve(dst.profiles, dst.resolve); + let (inbound_addr, inbound_serve) = + inbound.serve(bind_in, dst.profiles.clone(), gateway_stack); + let (outbound_addr, outbound_serve) = outbound.serve(bind_out, dst.profiles, dst.resolve); let start_proxy = Box::pin(async move { tokio::spawn(outbound_serve.instrument(info_span!("outbound"))); @@ -178,19 +212,19 @@ impl Config { } impl App { - pub fn admin_addr(&self) -> SocketAddr { + pub fn admin_addr(&self) -> Local { self.admin.listen_addr } - pub fn inbound_addr(&self) -> SocketAddr { + pub fn inbound_addr(&self) -> Local { self.inbound_addr } - pub fn outbound_addr(&self) -> SocketAddr { + pub fn outbound_addr(&self) -> Local { self.outbound_addr } - pub fn tap_addr(&self) -> Option { + pub fn tap_addr(&self) -> Option> { match self.tap { tap::Tap::Disabled { .. } => None, tap::Tap::Enabled { listen_addr, .. } => Some(listen_addr), diff --git a/linkerd/app/src/tap.rs b/linkerd/app/src/tap.rs index 5f3ff8d3d..e78c7c44f 100644 --- a/linkerd/app/src/tap.rs +++ b/linkerd/app/src/tap.rs @@ -1,10 +1,17 @@ use futures::prelude::*; use indexmap::IndexSet; use linkerd_app_core::{ - config::ServerConfig, drain, proxy::identity::LocalCrtKey, proxy::tap, serve, tls, - transport::listen::Addrs, Error, + config::ServerConfig, + drain, + proxy::identity::LocalCrtKey, + proxy::tap, + serve, + svc::{self, Param}, + tls, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, + Error, }; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tower::util::{service_fn, ServiceExt}; #[derive(Clone, Debug)] @@ -21,14 +28,23 @@ pub enum Tap { registry: tap::Registry, }, Enabled { - listen_addr: SocketAddr, + listen_addr: Local, registry: tap::Registry, serve: Pin + Send + 'static>>, }, } impl Config { - pub fn build(self, identity: Option, drain: drain::Watch) -> Result { + pub fn build( + self, + bind: B, + identity: Option, + drain: drain::Watch, + ) -> Result + where + B: Bind, + B::Addrs: Param>, + { let (registry, server) = tap::new(); match self { Config::Disabled => { @@ -39,22 +55,32 @@ impl Config { config, permitted_client_ids, } => { - let (listen_addr, listen) = config.bind.bind()?; - - let service = tap::AcceptPermittedClients::new(permitted_client_ids.into(), server); - let accept = tls::NewDetectTls::new( - identity, - move |meta: tls::server::Meta| { - let service = service.clone(); - service_fn(move |io| { - let fut = service.clone().oneshot((meta.clone(), io)); - Box::pin(async move { - fut.err_into::().await?.err_into::().await + let (listen_addr, listen) = bind.bind(&config)?; + let accept = svc::stack(server) + .push(svc::layer::mk(move |service| { + tap::AcceptPermittedClients::new( + permitted_client_ids.clone().into(), + service, + ) + })) + .push(svc::layer::mk(|service: tap::AcceptPermittedClients| { + move |meta: tls::server::Meta| { + let service = service.clone(); + service_fn(move |io| { + let fut = service.clone().oneshot((meta.clone(), io)); + Box::pin(async move { + fut.err_into::().await?.err_into::().await + }) }) - }) - }, - std::time::Duration::from_secs(1), - ); + } + })) + .check_new_service::, _>() + .push(tls::NewDetectTls::layer( + identity, + std::time::Duration::from_secs(1), + )) + .check_new_service::() + .into_inner(); let serve = Box::pin(serve::serve(listen, accept, drain.signaled())); diff --git a/linkerd/app/test/Cargo.toml b/linkerd/app/test/Cargo.toml index 5044a7e41..c1a56efbd 100644 --- a/linkerd/app/test/Cargo.toml +++ b/linkerd/app/test/Cargo.toml @@ -23,7 +23,7 @@ http = "0.2" http-body = "0.4" hyper = { version = "0.14.2", features = ["http1", "http2"] } linkerd-channel = { path = "../../channel" } -linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] } +linkerd-app-core = { path = "../core" } linkerd-identity = { path = "../../identity" } linkerd-io = { path = "../../io", features = ["tokio-test"] } regex = "1" diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index 6dbf8134c..8c7523580 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -13,7 +13,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::net::TcpStream; use tower::Service; #[derive(Clone, Debug)] @@ -64,7 +63,10 @@ impl AcceptPermittedClients { } } -impl Service>> for AcceptPermittedClients { +impl Service> for AcceptPermittedClients +where + I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static, +{ type Response = ServeFuture; type Error = Error; type Future = future::Ready>; @@ -73,7 +75,7 @@ impl Service>> for AcceptPermittedClien Poll::Ready(Ok(())) } - fn call(&mut self, conn: Connection>) -> Self::Future { + fn call(&mut self, conn: Connection) -> Self::Future { match conn { ((Conditional::Some(tls), _), io) => { if let tls::ServerTls::Established { diff --git a/linkerd/proxy/transport/Cargo.toml b/linkerd/proxy/transport/Cargo.toml index cf5b1cd67..ef24ca9ed 100644 --- a/linkerd/proxy/transport/Cargo.toml +++ b/linkerd/proxy/transport/Cargo.toml @@ -9,11 +9,6 @@ description = """ Transport-level implementations that rely on core proxy infrastructure """ -# This should probably be decomposed into smaller, decoupled crates. - -[features] -mock-orig-dst = [] - [dependencies] bytes = "1" futures = "0.3.9" diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index ee59fef6b..bd2cccdd6 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -4,11 +4,13 @@ pub mod addrs; mod connect; pub mod listen; pub mod metrics; +pub mod orig_dst; pub use self::{ addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, connect::ConnectTcp, - listen::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, NoOrigDstAddr}, + listen::{Bind, BindTcp}, + orig_dst::BindWithOrigDst, }; use std::time::Duration; use tokio::net::TcpStream; diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index 7b348d9c3..22afd49d4 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -1,307 +1,92 @@ use crate::{addrs::*, Keepalive}; use futures::prelude::*; +use linkerd_io as io; use linkerd_stack::Param; -use std::{io, net::SocketAddr}; +use std::{fmt, pin::Pin}; use tokio::net::TcpStream; use tokio_stream::wrappers::TcpListenerStream; -use tracing::trace; -/// A mockable source for address info, i.e., for tests. -pub trait GetOrigDstAddr: Clone { - fn orig_dst_addr(&self, socket: &TcpStream) -> Option; +/// Binds a listener, producing a stream of incoming connections. +/// +/// Typically, this represents binding a TCP socket. However, it may also be an +/// stream of in-memory mock connections, for testing purposes. +pub trait Bind { + type Io: io::AsyncRead + + io::AsyncWrite + + io::Peek + + io::PeerAddr + + fmt::Debug + + Unpin + + Send + + Sync + + 'static; + type Addrs: Clone + Send + Sync + 'static; + type Incoming: Stream> + Send + Sync + 'static; + + fn bind(self, params: &T) -> io::Result>; } -#[derive(Clone, Debug)] -pub struct BindTcp { - addr: ListenAddr, - keepalive: Keepalive, - orig_dst_addr: O, -} +pub type Bound = (Local, I); -pub type Connection = (Addrs, TcpStream); +#[derive(Copy, Clone, Debug, Default)] +pub struct BindTcp(()); #[derive(Clone, Debug)] pub struct Addrs { - server: Local, - client: Remote, - orig_dst: Option, + pub server: Local, + pub client: Remote, } -#[derive(Copy, Clone, Debug)] -pub struct NoOrigDstAddr(()); - -// The mock-orig-dst feature disables use of the syscall-based OrigDstAddr implementation and -// replaces it with one that must be configured. - -#[cfg(not(feature = "mock-orig-dst"))] -pub use self::sys::SysOrigDstAddr as DefaultOrigDstAddr; - -#[cfg(feature = "mock-orig-dst")] -pub use self::mock::MockOrigDstAddr as DefaultOrigDstAddr; +// === impl BindTcp === impl BindTcp { - pub fn new(addr: ListenAddr, keepalive: Keepalive) -> Self { - Self { - addr, - keepalive, - orig_dst_addr: NoOrigDstAddr(()), - } + pub fn with_orig_dst() -> super::BindWithOrigDst { + super::BindWithOrigDst::from(Self::default()) } } -impl BindTcp { - pub fn with_orig_dst_addr(self, orig_dst_addr: B) -> BindTcp { - BindTcp { - orig_dst_addr, - addr: self.addr, - keepalive: self.keepalive, - } - } +impl Bind for BindTcp +where + T: Param
(self, profiles: P, resolve: R) -> (SocketAddr, impl Future) + pub fn serve( + self, + bind: B, + profiles: P, + resolve: R, + ) -> (Local, impl Future) where + B: Bind, + B::Addrs: Param> + Param, R: Clone + Send + Sync + Unpin + 'static, R: Resolve, R::Resolution: Send, @@ -145,12 +153,8 @@ impl Outbound<()> { P::Future: Send, P::Error: Send, { - let (listen_addr, listen) = self - .config - .proxy - .server - .bind - .bind() + let (listen_addr, listen) = bind + .bind(&self.config.proxy.server) .expect("Failed to bind outbound listener"); let serve = async move { diff --git a/linkerd/app/outbound/src/tcp/mod.rs b/linkerd/app/outbound/src/tcp/mod.rs index 684b093ee..19da0d33e 100644 --- a/linkerd/app/outbound/src/tcp/mod.rs +++ b/linkerd/app/outbound/src/tcp/mod.rs @@ -23,23 +23,6 @@ impl From for Accept { } } -impl std::convert::TryFrom> for Accept { - type Error = std::io::Error; - - fn try_from(orig_dst: Option) -> Result { - match orig_dst { - Some(addr) => Ok(Self::from(addr)), - None => { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - )) - } - } - } -} - impl From<(P, Accept)> for target::Accept { fn from((protocol, Accept { orig_dst, .. }): (P, Accept)) -> Self { Self { orig_dst, protocol } diff --git a/linkerd/app/outbound/src/tcp/tests.rs b/linkerd/app/outbound/src/tcp/tests.rs index 892de2d11..1b7e992ec 100644 --- a/linkerd/app/outbound/src/tcp/tests.rs +++ b/linkerd/app/outbound/src/tcp/tests.rs @@ -13,7 +13,7 @@ use linkerd_app_core::{ io, svc, svc::NewService, tls, - transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{addrs::*, listen, orig_dst}, Addr, Conditional, Error, IpMatch, NameAddr, }; use std::{ @@ -62,7 +62,7 @@ async fn plaintext_tcp() { support::resolver().endpoint_exists(target_addr, target_addr, Default::default()); // Build the outbound TCP balancer stack. - let cfg = default_config(target_addr); + let cfg = default_config(); let (rt, _) = runtime(); Outbound::new(cfg, rt) .with_stack(connect) @@ -141,7 +141,7 @@ async fn tls_when_hinted() { // Build the outbound TCP balancer stack. let (rt, _) = runtime(); - let mut stack = Outbound::new(default_config(([0, 0, 0, 0], 0).into()), rt) + let mut stack = Outbound::new(default_config(), rt) .with_stack(connect) .push_tcp_logical(resolver) .into_inner(); @@ -164,7 +164,7 @@ async fn resolutions_are_reused() { let _trace = support::trace_init(); let addr = SocketAddr::new([0, 0, 0, 0].into(), 5550); - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); // Build a mock "connector" that returns the upstream "server" IO. @@ -242,7 +242,7 @@ async fn load_balances() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -336,7 +336,7 @@ async fn load_balancer_add_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -449,7 +449,7 @@ async fn load_balancer_remove_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -542,7 +542,7 @@ async fn no_profiles_when_outside_search_nets() { let no_profile_addr = SocketAddr::new([126, 32, 5, 18].into(), 5550); let cfg = Config { allow_discovery: IpMatch::new(Some(IpNet::from_str("10.0.0.0/8").unwrap())).into(), - ..default_config(profile_addr) + ..default_config() }; let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -616,7 +616,7 @@ async fn no_discovery_when_profile_has_an_endpoint() { let _trace = support::trace_init(); let ep = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep); + let cfg = default_config(); let meta = support::resolver::Metadata::new( Default::default(), support::resolver::ProtocolHint::Unknown, @@ -661,7 +661,7 @@ async fn profile_endpoint_propagates_conn_errors() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let ep2 = SocketAddr::new([10, 0, 0, 42].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let meta = support::resolver::Metadata::new( @@ -697,12 +697,7 @@ async fn profile_endpoint_propagates_conn_errors() { // Build the outbound server let mut server = build_server(cfg, profiles, resolver, connect); - let svc = server.new_service(listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(ep1)), - )); - + let svc = server.new_service(addrs(ep1)); let res = svc .oneshot( support::io() @@ -765,7 +760,7 @@ fn build_server( resolver: resolver::Dst, connect: Connect, ) -> impl svc::NewService< - listen::Addrs, + orig_dst::Addrs, Service = impl tower::Service< I, Response = (), @@ -792,7 +787,7 @@ fn hello_world_client( new_svc: &mut N, ) -> impl Future + Send where - N: svc::NewService + Send + 'static, + N: svc::NewService + Send + 'static, S: svc::Service + Send + 'static, S::Error: Into, S::Future: Send + 'static, @@ -800,11 +795,13 @@ where let span = tracing::info_span!("hello_world_client", %orig_dst); let svc = { let _e = span.enter(); - let addrs = listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(orig_dst)), - ); + let addrs = orig_dst::Addrs { + orig_dst: OrigDstAddr(orig_dst), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + }; let svc = new_svc.new_service(addrs); tracing::trace!("new service"); svc diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index 5b36533e6..7c38becef 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -7,25 +7,20 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{self, orig_dst, Keepalive, ListenAddr}, IpMatch, ProxyRuntime, }; pub use linkerd_app_test as support; use std::{net::SocketAddr, str::FromStr, time::Duration}; -const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; - -pub fn default_config(orig_dst: SocketAddr) -> Config { +pub fn default_config() -> Config { Config { ingress_mode: false, allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(), proxy: config::ProxyConfig { server: config::ServerConfig { - bind: BindTcp::new( - ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)), - Keepalive(None), - ) - .with_orig_dst_addr(orig_dst.into()), + addr: ListenAddr(([0, 0, 0, 0], 0).into()), + keepalive: Keepalive(None), h2_settings: h2::Settings::default(), }, connect: config::ConnectConfig { @@ -65,3 +60,14 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) { }; (runtime, drain_tx) } + +pub fn addrs(od: SocketAddr) -> orig_dst::Addrs { + use transport::{addrs::*, listen}; + orig_dst::Addrs { + orig_dst: OrigDstAddr(od), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + } +} diff --git a/linkerd/app/src/admin.rs b/linkerd/app/src/admin.rs index 91d362d9a..7f9c88f04 100644 --- a/linkerd/app/src/admin.rs +++ b/linkerd/app/src/admin.rs @@ -3,17 +3,18 @@ use crate::core::{ config::ServerConfig, detect, drain, errors, metrics::{self, FmtMetrics}, - serve, tls, trace, - transport::listen, + serve, + svc::{self, Param}, + tls, trace, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, Error, }; use crate::{ http, identity::LocalCrtKey, inbound::target::{HttpAccept, Target, TcpAccept}, - svc, }; -use std::{net::SocketAddr, pin::Pin, time::Duration}; +use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tracing::debug; @@ -24,7 +25,7 @@ pub struct Config { } pub struct Admin { - pub listen_addr: SocketAddr, + pub listen_addr: Local, pub latch: admin::Latch, pub serve: Pin + Send + 'static>>, } @@ -32,8 +33,10 @@ pub struct Admin { // === impl Config === impl Config { - pub fn build( + #[allow(clippy::clippy::too_many_arguments)] + pub fn build( self, + bind: B, identity: Option, report: R, metrics: metrics::Proxy, @@ -43,10 +46,12 @@ impl Config { ) -> Result where R: FmtMetrics + Clone + Send + 'static + Unpin, + B: Bind, + B::Addrs: svc::Param> + svc::Param>, { const DETECT_TIMEOUT: Duration = Duration::from_secs(1); - let (listen_addr, listen) = self.server.bind.bind()?; + let (listen_addr, listen) = bind.bind(&self.server)?; let (ready, latch) = admin::Readiness::new(); let admin = admin::Admin::new(report, ready, shutdown, trace); @@ -76,8 +81,15 @@ impl Config { http::DetectHttp::default(), )) .push(metrics.transport.layer_accept()) - .push_map_target(TcpAccept::from_local_addr) - .check_new_clone::>() + .push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + // TODO this should use an admin-specific target type. + let Local(ServerAddr(target_addr)) = addrs.param(); + TcpAccept { + tls, + client_addr: addrs.param(), + target_addr, + } + }) .push(tls::NewDetectTls::layer(identity, DETECT_TIMEOUT)) .into_inner(); diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 24e2464fd..5d7ffb21b 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -4,7 +4,7 @@ use crate::core::{ control::{Config as ControlConfig, ControlAddr}, proxy::http::{h1, h2}, tls, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{Keepalive, ListenAddr}, Addr, AddrMatch, Conditional, NameMatch, }; use crate::{dns, gateway, identity, inbound, oc_collector, outbound}; @@ -52,15 +52,6 @@ pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"; pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"; pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"; -// When compiled with the `mock-orig-dst` flag, these environment variables are required to -// configure the proxy's behavior. - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_INBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_INBOUND_ORIG_DST_ADDR"; - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_OUTBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_ORIG_DST_ADDR"; - pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE"; const ENV_INGRESS_MODE: &str = "LINKERD2_PROXY_INGRESS_MODE"; @@ -262,12 +253,6 @@ pub fn parse_config(strings: &S) -> Result let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration); let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration); - #[cfg(feature = "mock-orig-dst")] - let (inbound_mock_orig_dst, outbound_mock_orig_dst) = ( - parse(strings, ENV_INBOUND_ORIG_DST_ADDR, parse_socket_addr), - parse(strings, ENV_OUTBOUND_ORIG_DST_ADDR, parse_socket_addr), - ); - let inbound_disable_ports = parse( strings, ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, @@ -360,26 +345,6 @@ pub fn parse_config(strings: &S) -> Result let buffer_capacity = buffer_capacity?.unwrap_or(DEFAULT_BUFFER_CAPACITY); - #[cfg(feature = "mock-orig-dst")] - let (inbound_orig_dst, outbound_orig_dst) = ( - inbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_INBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - outbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_OUTBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - ); - - #[cfg(not(feature = "mock-orig-dst"))] - let (inbound_orig_dst, outbound_orig_dst): (DefaultOrigDstAddr, DefaultOrigDstAddr) = - Default::default(); - let dst_profile_suffixes = dst_profile_suffixes? .unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()); let dst_profile_networks = dst_profile_networks?.unwrap_or_default(); @@ -387,16 +352,14 @@ pub fn parse_config(strings: &S) -> Result let outbound = { let ingress_mode = parse(strings, ENV_INGRESS_MODE, parse_bool)?.unwrap_or(false); - let keepalive = Keepalive(outbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - outbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + outbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(outbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(outbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -451,16 +414,14 @@ pub fn parse_config(strings: &S) -> Result }; let inbound = { - let keepalive = Keepalive(inbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - inbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + inbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(inbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(inbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -507,7 +468,7 @@ pub fn parse_config(strings: &S) -> Result // Ensure that connections thaat directly target the inbound port are // secured (unless identity is disabled). - let inbound_port = server.bind.addr().as_ref().port(); + let inbound_port = server.addr.as_ref().port(); if !id_disabled && !require_identity_for_inbound_ports.contains(&inbound_port) { debug!( "Adding {} to {}", @@ -566,13 +527,11 @@ pub fn parse_config(strings: &S) -> Result let admin = super::admin::Config { metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), server: ServerConfig { - bind: BindTcp::new( - ListenAddr( - admin_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), - ), - inbound.proxy.server.bind.keepalive(), + addr: ListenAddr( + admin_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), ), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }; @@ -617,7 +576,8 @@ pub fn parse_config(strings: &S) -> Result .map(|(addr, ids)| super::tap::Config::Enabled { permitted_client_ids: ids, config: ServerConfig { - bind: BindTcp::new(ListenAddr(addr), inbound.proxy.server.bind.keepalive()), + addr: ListenAddr(addr), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }) diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 829a3c9b3..e9f21f138 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -12,12 +12,20 @@ pub mod tap; pub use self::metrics::Metrics; use futures::{future, FutureExt, TryFutureExt}; pub use linkerd_app_core::{self as core, metrics, trace}; -use linkerd_app_core::{control::ControlAddr, dns, drain, proxy::http, svc, Error, ProxyRuntime}; +use linkerd_app_core::{ + config::ServerConfig, + control::ControlAddr, + dns, drain, + proxy::http, + svc::Param, + transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + Error, ProxyRuntime, +}; use linkerd_app_gateway as gateway; use linkerd_app_inbound::{self as inbound, Inbound}; use linkerd_app_outbound::{self as outbound, Outbound}; use linkerd_channel::into_stream::IntoStream; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tokio::{sync::mpsc, time::Duration}; use tracing::instrument::Instrument; use tracing::{debug, info, info_span}; @@ -53,9 +61,9 @@ pub struct App { drain: drain::Signal, dst: ControlAddr, identity: identity::Identity, - inbound_addr: SocketAddr, + inbound_addr: Local, oc_collector: oc_collector::OcCollector, - outbound_addr: SocketAddr, + outbound_addr: Local, start_proxy: Pin + Send + 'static>>, tap: tap::Tap, } @@ -64,16 +72,29 @@ impl Config { pub fn try_from_env() -> Result { env::Env.try_config() } +} +impl Config { /// Build an application. /// /// It is currently required that this be run on a Tokio runtime, since some /// services are created eagerly and must spawn tasks to do so. - pub async fn build( + pub async fn build( self, + bind_in: BIn, + bind_out: BOut, + bind_admin: BAdmin, shutdown_tx: mpsc::UnboundedSender<()>, log_level: trace::Handle, - ) -> Result { + ) -> Result + where + BIn: Bind + 'static, + BIn::Addrs: Param> + Param> + Param, + BOut: Bind + 'static, + BOut::Addrs: Param> + Param> + Param, + BAdmin: Bind + Clone + 'static, + BAdmin::Addrs: Param> + Param>, + { use metrics::FmtMetrics; let Config { @@ -98,7 +119,11 @@ impl Config { let (drain_tx, drain_rx) = drain::channel(); - let tap = info_span!("tap").in_scope(|| tap.build(identity.local(), drain_rx.clone()))?; + let tap = { + let bind = bind_admin.clone(); + info_span!("tap").in_scope(|| tap.build(bind, identity.local(), drain_rx.clone()))? + }; + let dst = { let metrics = metrics.control.clone(); let dns = dns.resolver.clone(); @@ -119,7 +144,15 @@ impl Config { let drain = drain_rx.clone(); let metrics = metrics.inbound.clone(); info_span!("admin").in_scope(move || { - admin.build(identity, report, metrics, log_level, drain, shutdown_tx) + admin.build( + bind_admin, + identity, + report, + metrics, + log_level, + drain, + shutdown_tx, + ) })? }; @@ -155,8 +188,9 @@ impl Config { dst.resolve.clone(), ); - let (inbound_addr, inbound_serve) = inbound.serve(dst.profiles.clone(), gateway_stack); - let (outbound_addr, outbound_serve) = outbound.serve(dst.profiles, dst.resolve); + let (inbound_addr, inbound_serve) = + inbound.serve(bind_in, dst.profiles.clone(), gateway_stack); + let (outbound_addr, outbound_serve) = outbound.serve(bind_out, dst.profiles, dst.resolve); let start_proxy = Box::pin(async move { tokio::spawn(outbound_serve.instrument(info_span!("outbound"))); @@ -178,19 +212,19 @@ impl Config { } impl App { - pub fn admin_addr(&self) -> SocketAddr { + pub fn admin_addr(&self) -> Local { self.admin.listen_addr } - pub fn inbound_addr(&self) -> SocketAddr { + pub fn inbound_addr(&self) -> Local { self.inbound_addr } - pub fn outbound_addr(&self) -> SocketAddr { + pub fn outbound_addr(&self) -> Local { self.outbound_addr } - pub fn tap_addr(&self) -> Option { + pub fn tap_addr(&self) -> Option> { match self.tap { tap::Tap::Disabled { .. } => None, tap::Tap::Enabled { listen_addr, .. } => Some(listen_addr), diff --git a/linkerd/app/src/tap.rs b/linkerd/app/src/tap.rs index 5f3ff8d3d..e78c7c44f 100644 --- a/linkerd/app/src/tap.rs +++ b/linkerd/app/src/tap.rs @@ -1,10 +1,17 @@ use futures::prelude::*; use indexmap::IndexSet; use linkerd_app_core::{ - config::ServerConfig, drain, proxy::identity::LocalCrtKey, proxy::tap, serve, tls, - transport::listen::Addrs, Error, + config::ServerConfig, + drain, + proxy::identity::LocalCrtKey, + proxy::tap, + serve, + svc::{self, Param}, + tls, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, + Error, }; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tower::util::{service_fn, ServiceExt}; #[derive(Clone, Debug)] @@ -21,14 +28,23 @@ pub enum Tap { registry: tap::Registry, }, Enabled { - listen_addr: SocketAddr, + listen_addr: Local, registry: tap::Registry, serve: Pin + Send + 'static>>, }, } impl Config { - pub fn build(self, identity: Option, drain: drain::Watch) -> Result { + pub fn build( + self, + bind: B, + identity: Option, + drain: drain::Watch, + ) -> Result + where + B: Bind, + B::Addrs: Param>, + { let (registry, server) = tap::new(); match self { Config::Disabled => { @@ -39,22 +55,32 @@ impl Config { config, permitted_client_ids, } => { - let (listen_addr, listen) = config.bind.bind()?; - - let service = tap::AcceptPermittedClients::new(permitted_client_ids.into(), server); - let accept = tls::NewDetectTls::new( - identity, - move |meta: tls::server::Meta| { - let service = service.clone(); - service_fn(move |io| { - let fut = service.clone().oneshot((meta.clone(), io)); - Box::pin(async move { - fut.err_into::().await?.err_into::().await + let (listen_addr, listen) = bind.bind(&config)?; + let accept = svc::stack(server) + .push(svc::layer::mk(move |service| { + tap::AcceptPermittedClients::new( + permitted_client_ids.clone().into(), + service, + ) + })) + .push(svc::layer::mk(|service: tap::AcceptPermittedClients| { + move |meta: tls::server::Meta| { + let service = service.clone(); + service_fn(move |io| { + let fut = service.clone().oneshot((meta.clone(), io)); + Box::pin(async move { + fut.err_into::().await?.err_into::().await + }) }) - }) - }, - std::time::Duration::from_secs(1), - ); + } + })) + .check_new_service::, _>() + .push(tls::NewDetectTls::layer( + identity, + std::time::Duration::from_secs(1), + )) + .check_new_service::() + .into_inner(); let serve = Box::pin(serve::serve(listen, accept, drain.signaled())); diff --git a/linkerd/app/test/Cargo.toml b/linkerd/app/test/Cargo.toml index 5044a7e41..c1a56efbd 100644 --- a/linkerd/app/test/Cargo.toml +++ b/linkerd/app/test/Cargo.toml @@ -23,7 +23,7 @@ http = "0.2" http-body = "0.4" hyper = { version = "0.14.2", features = ["http1", "http2"] } linkerd-channel = { path = "../../channel" } -linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] } +linkerd-app-core = { path = "../core" } linkerd-identity = { path = "../../identity" } linkerd-io = { path = "../../io", features = ["tokio-test"] } regex = "1" diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index 6dbf8134c..8c7523580 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -13,7 +13,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::net::TcpStream; use tower::Service; #[derive(Clone, Debug)] @@ -64,7 +63,10 @@ impl AcceptPermittedClients { } } -impl Service>> for AcceptPermittedClients { +impl Service> for AcceptPermittedClients +where + I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static, +{ type Response = ServeFuture; type Error = Error; type Future = future::Ready>; @@ -73,7 +75,7 @@ impl Service>> for AcceptPermittedClien Poll::Ready(Ok(())) } - fn call(&mut self, conn: Connection>) -> Self::Future { + fn call(&mut self, conn: Connection) -> Self::Future { match conn { ((Conditional::Some(tls), _), io) => { if let tls::ServerTls::Established { diff --git a/linkerd/proxy/transport/Cargo.toml b/linkerd/proxy/transport/Cargo.toml index cf5b1cd67..ef24ca9ed 100644 --- a/linkerd/proxy/transport/Cargo.toml +++ b/linkerd/proxy/transport/Cargo.toml @@ -9,11 +9,6 @@ description = """ Transport-level implementations that rely on core proxy infrastructure """ -# This should probably be decomposed into smaller, decoupled crates. - -[features] -mock-orig-dst = [] - [dependencies] bytes = "1" futures = "0.3.9" diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index ee59fef6b..bd2cccdd6 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -4,11 +4,13 @@ pub mod addrs; mod connect; pub mod listen; pub mod metrics; +pub mod orig_dst; pub use self::{ addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, connect::ConnectTcp, - listen::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, NoOrigDstAddr}, + listen::{Bind, BindTcp}, + orig_dst::BindWithOrigDst, }; use std::time::Duration; use tokio::net::TcpStream; diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index 7b348d9c3..22afd49d4 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -1,307 +1,92 @@ use crate::{addrs::*, Keepalive}; use futures::prelude::*; +use linkerd_io as io; use linkerd_stack::Param; -use std::{io, net::SocketAddr}; +use std::{fmt, pin::Pin}; use tokio::net::TcpStream; use tokio_stream::wrappers::TcpListenerStream; -use tracing::trace; -/// A mockable source for address info, i.e., for tests. -pub trait GetOrigDstAddr: Clone { - fn orig_dst_addr(&self, socket: &TcpStream) -> Option; +/// Binds a listener, producing a stream of incoming connections. +/// +/// Typically, this represents binding a TCP socket. However, it may also be an +/// stream of in-memory mock connections, for testing purposes. +pub trait Bind { + type Io: io::AsyncRead + + io::AsyncWrite + + io::Peek + + io::PeerAddr + + fmt::Debug + + Unpin + + Send + + Sync + + 'static; + type Addrs: Clone + Send + Sync + 'static; + type Incoming: Stream> + Send + Sync + 'static; + + fn bind(self, params: &T) -> io::Result>; } -#[derive(Clone, Debug)] -pub struct BindTcp { - addr: ListenAddr, - keepalive: Keepalive, - orig_dst_addr: O, -} +pub type Bound = (Local, I); -pub type Connection = (Addrs, TcpStream); +#[derive(Copy, Clone, Debug, Default)] +pub struct BindTcp(()); #[derive(Clone, Debug)] pub struct Addrs { - server: Local, - client: Remote, - orig_dst: Option, + pub server: Local, + pub client: Remote, } -#[derive(Copy, Clone, Debug)] -pub struct NoOrigDstAddr(()); - -// The mock-orig-dst feature disables use of the syscall-based OrigDstAddr implementation and -// replaces it with one that must be configured. - -#[cfg(not(feature = "mock-orig-dst"))] -pub use self::sys::SysOrigDstAddr as DefaultOrigDstAddr; - -#[cfg(feature = "mock-orig-dst")] -pub use self::mock::MockOrigDstAddr as DefaultOrigDstAddr; +// === impl BindTcp === impl BindTcp { - pub fn new(addr: ListenAddr, keepalive: Keepalive) -> Self { - Self { - addr, - keepalive, - orig_dst_addr: NoOrigDstAddr(()), - } + pub fn with_orig_dst() -> super::BindWithOrigDst { + super::BindWithOrigDst::from(Self::default()) } } -impl BindTcp { - pub fn with_orig_dst_addr(self, orig_dst_addr: B) -> BindTcp { - BindTcp { - orig_dst_addr, - addr: self.addr, - keepalive: self.keepalive, - } - } +impl Bind for BindTcp +where + T: Param
From<(P, Accept)> for target::Accept
{ fn from((protocol, Accept { orig_dst, .. }): (P, Accept)) -> Self { Self { orig_dst, protocol } diff --git a/linkerd/app/outbound/src/tcp/tests.rs b/linkerd/app/outbound/src/tcp/tests.rs index 892de2d11..1b7e992ec 100644 --- a/linkerd/app/outbound/src/tcp/tests.rs +++ b/linkerd/app/outbound/src/tcp/tests.rs @@ -13,7 +13,7 @@ use linkerd_app_core::{ io, svc, svc::NewService, tls, - transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{addrs::*, listen, orig_dst}, Addr, Conditional, Error, IpMatch, NameAddr, }; use std::{ @@ -62,7 +62,7 @@ async fn plaintext_tcp() { support::resolver().endpoint_exists(target_addr, target_addr, Default::default()); // Build the outbound TCP balancer stack. - let cfg = default_config(target_addr); + let cfg = default_config(); let (rt, _) = runtime(); Outbound::new(cfg, rt) .with_stack(connect) @@ -141,7 +141,7 @@ async fn tls_when_hinted() { // Build the outbound TCP balancer stack. let (rt, _) = runtime(); - let mut stack = Outbound::new(default_config(([0, 0, 0, 0], 0).into()), rt) + let mut stack = Outbound::new(default_config(), rt) .with_stack(connect) .push_tcp_logical(resolver) .into_inner(); @@ -164,7 +164,7 @@ async fn resolutions_are_reused() { let _trace = support::trace_init(); let addr = SocketAddr::new([0, 0, 0, 0].into(), 5550); - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); // Build a mock "connector" that returns the upstream "server" IO. @@ -242,7 +242,7 @@ async fn load_balances() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -336,7 +336,7 @@ async fn load_balancer_add_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -449,7 +449,7 @@ async fn load_balancer_remove_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -542,7 +542,7 @@ async fn no_profiles_when_outside_search_nets() { let no_profile_addr = SocketAddr::new([126, 32, 5, 18].into(), 5550); let cfg = Config { allow_discovery: IpMatch::new(Some(IpNet::from_str("10.0.0.0/8").unwrap())).into(), - ..default_config(profile_addr) + ..default_config() }; let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -616,7 +616,7 @@ async fn no_discovery_when_profile_has_an_endpoint() { let _trace = support::trace_init(); let ep = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep); + let cfg = default_config(); let meta = support::resolver::Metadata::new( Default::default(), support::resolver::ProtocolHint::Unknown, @@ -661,7 +661,7 @@ async fn profile_endpoint_propagates_conn_errors() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let ep2 = SocketAddr::new([10, 0, 0, 42].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let meta = support::resolver::Metadata::new( @@ -697,12 +697,7 @@ async fn profile_endpoint_propagates_conn_errors() { // Build the outbound server let mut server = build_server(cfg, profiles, resolver, connect); - let svc = server.new_service(listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(ep1)), - )); - + let svc = server.new_service(addrs(ep1)); let res = svc .oneshot( support::io() @@ -765,7 +760,7 @@ fn build_server( resolver: resolver::Dst, connect: Connect, ) -> impl svc::NewService< - listen::Addrs, + orig_dst::Addrs, Service = impl tower::Service< I, Response = (), @@ -792,7 +787,7 @@ fn hello_world_client( new_svc: &mut N, ) -> impl Future + Send where - N: svc::NewService + Send + 'static, + N: svc::NewService + Send + 'static, S: svc::Service + Send + 'static, S::Error: Into, S::Future: Send + 'static, @@ -800,11 +795,13 @@ where let span = tracing::info_span!("hello_world_client", %orig_dst); let svc = { let _e = span.enter(); - let addrs = listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(orig_dst)), - ); + let addrs = orig_dst::Addrs { + orig_dst: OrigDstAddr(orig_dst), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + }; let svc = new_svc.new_service(addrs); tracing::trace!("new service"); svc diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index 5b36533e6..7c38becef 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -7,25 +7,20 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{self, orig_dst, Keepalive, ListenAddr}, IpMatch, ProxyRuntime, }; pub use linkerd_app_test as support; use std::{net::SocketAddr, str::FromStr, time::Duration}; -const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; - -pub fn default_config(orig_dst: SocketAddr) -> Config { +pub fn default_config() -> Config { Config { ingress_mode: false, allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(), proxy: config::ProxyConfig { server: config::ServerConfig { - bind: BindTcp::new( - ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)), - Keepalive(None), - ) - .with_orig_dst_addr(orig_dst.into()), + addr: ListenAddr(([0, 0, 0, 0], 0).into()), + keepalive: Keepalive(None), h2_settings: h2::Settings::default(), }, connect: config::ConnectConfig { @@ -65,3 +60,14 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) { }; (runtime, drain_tx) } + +pub fn addrs(od: SocketAddr) -> orig_dst::Addrs { + use transport::{addrs::*, listen}; + orig_dst::Addrs { + orig_dst: OrigDstAddr(od), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + } +} diff --git a/linkerd/app/src/admin.rs b/linkerd/app/src/admin.rs index 91d362d9a..7f9c88f04 100644 --- a/linkerd/app/src/admin.rs +++ b/linkerd/app/src/admin.rs @@ -3,17 +3,18 @@ use crate::core::{ config::ServerConfig, detect, drain, errors, metrics::{self, FmtMetrics}, - serve, tls, trace, - transport::listen, + serve, + svc::{self, Param}, + tls, trace, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, Error, }; use crate::{ http, identity::LocalCrtKey, inbound::target::{HttpAccept, Target, TcpAccept}, - svc, }; -use std::{net::SocketAddr, pin::Pin, time::Duration}; +use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tracing::debug; @@ -24,7 +25,7 @@ pub struct Config { } pub struct Admin { - pub listen_addr: SocketAddr, + pub listen_addr: Local, pub latch: admin::Latch, pub serve: Pin + Send + 'static>>, } @@ -32,8 +33,10 @@ pub struct Admin { // === impl Config === impl Config { - pub fn build( + #[allow(clippy::clippy::too_many_arguments)] + pub fn build( self, + bind: B, identity: Option, report: R, metrics: metrics::Proxy, @@ -43,10 +46,12 @@ impl Config { ) -> Result where R: FmtMetrics + Clone + Send + 'static + Unpin, + B: Bind, + B::Addrs: svc::Param> + svc::Param>, { const DETECT_TIMEOUT: Duration = Duration::from_secs(1); - let (listen_addr, listen) = self.server.bind.bind()?; + let (listen_addr, listen) = bind.bind(&self.server)?; let (ready, latch) = admin::Readiness::new(); let admin = admin::Admin::new(report, ready, shutdown, trace); @@ -76,8 +81,15 @@ impl Config { http::DetectHttp::default(), )) .push(metrics.transport.layer_accept()) - .push_map_target(TcpAccept::from_local_addr) - .check_new_clone::>() + .push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + // TODO this should use an admin-specific target type. + let Local(ServerAddr(target_addr)) = addrs.param(); + TcpAccept { + tls, + client_addr: addrs.param(), + target_addr, + } + }) .push(tls::NewDetectTls::layer(identity, DETECT_TIMEOUT)) .into_inner(); diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 24e2464fd..5d7ffb21b 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -4,7 +4,7 @@ use crate::core::{ control::{Config as ControlConfig, ControlAddr}, proxy::http::{h1, h2}, tls, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{Keepalive, ListenAddr}, Addr, AddrMatch, Conditional, NameMatch, }; use crate::{dns, gateway, identity, inbound, oc_collector, outbound}; @@ -52,15 +52,6 @@ pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"; pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"; pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"; -// When compiled with the `mock-orig-dst` flag, these environment variables are required to -// configure the proxy's behavior. - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_INBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_INBOUND_ORIG_DST_ADDR"; - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_OUTBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_ORIG_DST_ADDR"; - pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE"; const ENV_INGRESS_MODE: &str = "LINKERD2_PROXY_INGRESS_MODE"; @@ -262,12 +253,6 @@ pub fn parse_config(strings: &S) -> Result let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration); let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration); - #[cfg(feature = "mock-orig-dst")] - let (inbound_mock_orig_dst, outbound_mock_orig_dst) = ( - parse(strings, ENV_INBOUND_ORIG_DST_ADDR, parse_socket_addr), - parse(strings, ENV_OUTBOUND_ORIG_DST_ADDR, parse_socket_addr), - ); - let inbound_disable_ports = parse( strings, ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, @@ -360,26 +345,6 @@ pub fn parse_config(strings: &S) -> Result let buffer_capacity = buffer_capacity?.unwrap_or(DEFAULT_BUFFER_CAPACITY); - #[cfg(feature = "mock-orig-dst")] - let (inbound_orig_dst, outbound_orig_dst) = ( - inbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_INBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - outbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_OUTBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - ); - - #[cfg(not(feature = "mock-orig-dst"))] - let (inbound_orig_dst, outbound_orig_dst): (DefaultOrigDstAddr, DefaultOrigDstAddr) = - Default::default(); - let dst_profile_suffixes = dst_profile_suffixes? .unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()); let dst_profile_networks = dst_profile_networks?.unwrap_or_default(); @@ -387,16 +352,14 @@ pub fn parse_config(strings: &S) -> Result let outbound = { let ingress_mode = parse(strings, ENV_INGRESS_MODE, parse_bool)?.unwrap_or(false); - let keepalive = Keepalive(outbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - outbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + outbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(outbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(outbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -451,16 +414,14 @@ pub fn parse_config(strings: &S) -> Result }; let inbound = { - let keepalive = Keepalive(inbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - inbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + inbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(inbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(inbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -507,7 +468,7 @@ pub fn parse_config(strings: &S) -> Result // Ensure that connections thaat directly target the inbound port are // secured (unless identity is disabled). - let inbound_port = server.bind.addr().as_ref().port(); + let inbound_port = server.addr.as_ref().port(); if !id_disabled && !require_identity_for_inbound_ports.contains(&inbound_port) { debug!( "Adding {} to {}", @@ -566,13 +527,11 @@ pub fn parse_config(strings: &S) -> Result let admin = super::admin::Config { metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), server: ServerConfig { - bind: BindTcp::new( - ListenAddr( - admin_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), - ), - inbound.proxy.server.bind.keepalive(), + addr: ListenAddr( + admin_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), ), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }; @@ -617,7 +576,8 @@ pub fn parse_config(strings: &S) -> Result .map(|(addr, ids)| super::tap::Config::Enabled { permitted_client_ids: ids, config: ServerConfig { - bind: BindTcp::new(ListenAddr(addr), inbound.proxy.server.bind.keepalive()), + addr: ListenAddr(addr), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }) diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 829a3c9b3..e9f21f138 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -12,12 +12,20 @@ pub mod tap; pub use self::metrics::Metrics; use futures::{future, FutureExt, TryFutureExt}; pub use linkerd_app_core::{self as core, metrics, trace}; -use linkerd_app_core::{control::ControlAddr, dns, drain, proxy::http, svc, Error, ProxyRuntime}; +use linkerd_app_core::{ + config::ServerConfig, + control::ControlAddr, + dns, drain, + proxy::http, + svc::Param, + transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + Error, ProxyRuntime, +}; use linkerd_app_gateway as gateway; use linkerd_app_inbound::{self as inbound, Inbound}; use linkerd_app_outbound::{self as outbound, Outbound}; use linkerd_channel::into_stream::IntoStream; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tokio::{sync::mpsc, time::Duration}; use tracing::instrument::Instrument; use tracing::{debug, info, info_span}; @@ -53,9 +61,9 @@ pub struct App { drain: drain::Signal, dst: ControlAddr, identity: identity::Identity, - inbound_addr: SocketAddr, + inbound_addr: Local, oc_collector: oc_collector::OcCollector, - outbound_addr: SocketAddr, + outbound_addr: Local, start_proxy: Pin + Send + 'static>>, tap: tap::Tap, } @@ -64,16 +72,29 @@ impl Config { pub fn try_from_env() -> Result { env::Env.try_config() } +} +impl Config { /// Build an application. /// /// It is currently required that this be run on a Tokio runtime, since some /// services are created eagerly and must spawn tasks to do so. - pub async fn build( + pub async fn build( self, + bind_in: BIn, + bind_out: BOut, + bind_admin: BAdmin, shutdown_tx: mpsc::UnboundedSender<()>, log_level: trace::Handle, - ) -> Result { + ) -> Result + where + BIn: Bind + 'static, + BIn::Addrs: Param> + Param> + Param, + BOut: Bind + 'static, + BOut::Addrs: Param> + Param> + Param, + BAdmin: Bind + Clone + 'static, + BAdmin::Addrs: Param> + Param>, + { use metrics::FmtMetrics; let Config { @@ -98,7 +119,11 @@ impl Config { let (drain_tx, drain_rx) = drain::channel(); - let tap = info_span!("tap").in_scope(|| tap.build(identity.local(), drain_rx.clone()))?; + let tap = { + let bind = bind_admin.clone(); + info_span!("tap").in_scope(|| tap.build(bind, identity.local(), drain_rx.clone()))? + }; + let dst = { let metrics = metrics.control.clone(); let dns = dns.resolver.clone(); @@ -119,7 +144,15 @@ impl Config { let drain = drain_rx.clone(); let metrics = metrics.inbound.clone(); info_span!("admin").in_scope(move || { - admin.build(identity, report, metrics, log_level, drain, shutdown_tx) + admin.build( + bind_admin, + identity, + report, + metrics, + log_level, + drain, + shutdown_tx, + ) })? }; @@ -155,8 +188,9 @@ impl Config { dst.resolve.clone(), ); - let (inbound_addr, inbound_serve) = inbound.serve(dst.profiles.clone(), gateway_stack); - let (outbound_addr, outbound_serve) = outbound.serve(dst.profiles, dst.resolve); + let (inbound_addr, inbound_serve) = + inbound.serve(bind_in, dst.profiles.clone(), gateway_stack); + let (outbound_addr, outbound_serve) = outbound.serve(bind_out, dst.profiles, dst.resolve); let start_proxy = Box::pin(async move { tokio::spawn(outbound_serve.instrument(info_span!("outbound"))); @@ -178,19 +212,19 @@ impl Config { } impl App { - pub fn admin_addr(&self) -> SocketAddr { + pub fn admin_addr(&self) -> Local { self.admin.listen_addr } - pub fn inbound_addr(&self) -> SocketAddr { + pub fn inbound_addr(&self) -> Local { self.inbound_addr } - pub fn outbound_addr(&self) -> SocketAddr { + pub fn outbound_addr(&self) -> Local { self.outbound_addr } - pub fn tap_addr(&self) -> Option { + pub fn tap_addr(&self) -> Option> { match self.tap { tap::Tap::Disabled { .. } => None, tap::Tap::Enabled { listen_addr, .. } => Some(listen_addr), diff --git a/linkerd/app/src/tap.rs b/linkerd/app/src/tap.rs index 5f3ff8d3d..e78c7c44f 100644 --- a/linkerd/app/src/tap.rs +++ b/linkerd/app/src/tap.rs @@ -1,10 +1,17 @@ use futures::prelude::*; use indexmap::IndexSet; use linkerd_app_core::{ - config::ServerConfig, drain, proxy::identity::LocalCrtKey, proxy::tap, serve, tls, - transport::listen::Addrs, Error, + config::ServerConfig, + drain, + proxy::identity::LocalCrtKey, + proxy::tap, + serve, + svc::{self, Param}, + tls, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, + Error, }; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tower::util::{service_fn, ServiceExt}; #[derive(Clone, Debug)] @@ -21,14 +28,23 @@ pub enum Tap { registry: tap::Registry, }, Enabled { - listen_addr: SocketAddr, + listen_addr: Local, registry: tap::Registry, serve: Pin + Send + 'static>>, }, } impl Config { - pub fn build(self, identity: Option, drain: drain::Watch) -> Result { + pub fn build( + self, + bind: B, + identity: Option, + drain: drain::Watch, + ) -> Result + where + B: Bind, + B::Addrs: Param>, + { let (registry, server) = tap::new(); match self { Config::Disabled => { @@ -39,22 +55,32 @@ impl Config { config, permitted_client_ids, } => { - let (listen_addr, listen) = config.bind.bind()?; - - let service = tap::AcceptPermittedClients::new(permitted_client_ids.into(), server); - let accept = tls::NewDetectTls::new( - identity, - move |meta: tls::server::Meta| { - let service = service.clone(); - service_fn(move |io| { - let fut = service.clone().oneshot((meta.clone(), io)); - Box::pin(async move { - fut.err_into::().await?.err_into::().await + let (listen_addr, listen) = bind.bind(&config)?; + let accept = svc::stack(server) + .push(svc::layer::mk(move |service| { + tap::AcceptPermittedClients::new( + permitted_client_ids.clone().into(), + service, + ) + })) + .push(svc::layer::mk(|service: tap::AcceptPermittedClients| { + move |meta: tls::server::Meta| { + let service = service.clone(); + service_fn(move |io| { + let fut = service.clone().oneshot((meta.clone(), io)); + Box::pin(async move { + fut.err_into::().await?.err_into::().await + }) }) - }) - }, - std::time::Duration::from_secs(1), - ); + } + })) + .check_new_service::, _>() + .push(tls::NewDetectTls::layer( + identity, + std::time::Duration::from_secs(1), + )) + .check_new_service::() + .into_inner(); let serve = Box::pin(serve::serve(listen, accept, drain.signaled())); diff --git a/linkerd/app/test/Cargo.toml b/linkerd/app/test/Cargo.toml index 5044a7e41..c1a56efbd 100644 --- a/linkerd/app/test/Cargo.toml +++ b/linkerd/app/test/Cargo.toml @@ -23,7 +23,7 @@ http = "0.2" http-body = "0.4" hyper = { version = "0.14.2", features = ["http1", "http2"] } linkerd-channel = { path = "../../channel" } -linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] } +linkerd-app-core = { path = "../core" } linkerd-identity = { path = "../../identity" } linkerd-io = { path = "../../io", features = ["tokio-test"] } regex = "1" diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index 6dbf8134c..8c7523580 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -13,7 +13,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::net::TcpStream; use tower::Service; #[derive(Clone, Debug)] @@ -64,7 +63,10 @@ impl AcceptPermittedClients { } } -impl Service>> for AcceptPermittedClients { +impl Service> for AcceptPermittedClients +where + I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static, +{ type Response = ServeFuture; type Error = Error; type Future = future::Ready>; @@ -73,7 +75,7 @@ impl Service>> for AcceptPermittedClien Poll::Ready(Ok(())) } - fn call(&mut self, conn: Connection>) -> Self::Future { + fn call(&mut self, conn: Connection) -> Self::Future { match conn { ((Conditional::Some(tls), _), io) => { if let tls::ServerTls::Established { diff --git a/linkerd/proxy/transport/Cargo.toml b/linkerd/proxy/transport/Cargo.toml index cf5b1cd67..ef24ca9ed 100644 --- a/linkerd/proxy/transport/Cargo.toml +++ b/linkerd/proxy/transport/Cargo.toml @@ -9,11 +9,6 @@ description = """ Transport-level implementations that rely on core proxy infrastructure """ -# This should probably be decomposed into smaller, decoupled crates. - -[features] -mock-orig-dst = [] - [dependencies] bytes = "1" futures = "0.3.9" diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index ee59fef6b..bd2cccdd6 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -4,11 +4,13 @@ pub mod addrs; mod connect; pub mod listen; pub mod metrics; +pub mod orig_dst; pub use self::{ addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, connect::ConnectTcp, - listen::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, NoOrigDstAddr}, + listen::{Bind, BindTcp}, + orig_dst::BindWithOrigDst, }; use std::time::Duration; use tokio::net::TcpStream; diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index 7b348d9c3..22afd49d4 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -1,307 +1,92 @@ use crate::{addrs::*, Keepalive}; use futures::prelude::*; +use linkerd_io as io; use linkerd_stack::Param; -use std::{io, net::SocketAddr}; +use std::{fmt, pin::Pin}; use tokio::net::TcpStream; use tokio_stream::wrappers::TcpListenerStream; -use tracing::trace; -/// A mockable source for address info, i.e., for tests. -pub trait GetOrigDstAddr: Clone { - fn orig_dst_addr(&self, socket: &TcpStream) -> Option; +/// Binds a listener, producing a stream of incoming connections. +/// +/// Typically, this represents binding a TCP socket. However, it may also be an +/// stream of in-memory mock connections, for testing purposes. +pub trait Bind { + type Io: io::AsyncRead + + io::AsyncWrite + + io::Peek + + io::PeerAddr + + fmt::Debug + + Unpin + + Send + + Sync + + 'static; + type Addrs: Clone + Send + Sync + 'static; + type Incoming: Stream> + Send + Sync + 'static; + + fn bind(self, params: &T) -> io::Result>; } -#[derive(Clone, Debug)] -pub struct BindTcp { - addr: ListenAddr, - keepalive: Keepalive, - orig_dst_addr: O, -} +pub type Bound = (Local, I); -pub type Connection = (Addrs, TcpStream); +#[derive(Copy, Clone, Debug, Default)] +pub struct BindTcp(()); #[derive(Clone, Debug)] pub struct Addrs { - server: Local, - client: Remote, - orig_dst: Option, + pub server: Local, + pub client: Remote, } -#[derive(Copy, Clone, Debug)] -pub struct NoOrigDstAddr(()); - -// The mock-orig-dst feature disables use of the syscall-based OrigDstAddr implementation and -// replaces it with one that must be configured. - -#[cfg(not(feature = "mock-orig-dst"))] -pub use self::sys::SysOrigDstAddr as DefaultOrigDstAddr; - -#[cfg(feature = "mock-orig-dst")] -pub use self::mock::MockOrigDstAddr as DefaultOrigDstAddr; +// === impl BindTcp === impl BindTcp { - pub fn new(addr: ListenAddr, keepalive: Keepalive) -> Self { - Self { - addr, - keepalive, - orig_dst_addr: NoOrigDstAddr(()), - } + pub fn with_orig_dst() -> super::BindWithOrigDst { + super::BindWithOrigDst::from(Self::default()) } } -impl BindTcp { - pub fn with_orig_dst_addr(self, orig_dst_addr: B) -> BindTcp { - BindTcp { - orig_dst_addr, - addr: self.addr, - keepalive: self.keepalive, - } - } +impl Bind for BindTcp +where + T: Param