diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 211184cb0..f703155e6 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -7,7 +7,9 @@ use linkerd_app_core::{ serve, svc::{self, ExtractParam, InsertParam, Param}, tls, trace, - transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{ + self, addrs::AddrPair, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr, + }, Error, Result, }; use linkerd_app_inbound as inbound; @@ -84,7 +86,9 @@ impl Config { where R: FmtMetrics + Clone + Send + Sync + Unpin + 'static, B: Bind, - B::Addrs: svc::Param> + svc::Param>, + B::Addrs: svc::Param>, + B::Addrs: svc::Param>, + B::Addrs: svc::Param, { let (listen_addr, listen) = bind.bind(&self.server)?; @@ -202,6 +206,14 @@ impl Param> for Http { } } +impl Param for Http { + fn param(&self) -> AddrPair { + let Remote(client) = self.tcp.client; + let Local(server) = self.tcp.addr; + AddrPair(client, server) + } +} + impl Param for Http { fn param(&self) -> tls::ConditionalServerTls { self.tcp.tls.clone() diff --git a/linkerd/app/core/src/serve.rs b/linkerd/app/core/src/serve.rs index 0c8a593ad..32d454795 100644 --- a/linkerd/app/core/src/serve.rs +++ b/linkerd/app/core/src/serve.rs @@ -1,11 +1,11 @@ use crate::{ io, is_caused_by, svc::{self, Param}, - transport::{ClientAddr, Remote}, Result, }; use futures::prelude::*; use linkerd_error::Error; +use linkerd_proxy_transport::AddrPair; use tower::util::ServiceExt; use tracing::{debug, debug_span, info, instrument::Instrument, warn}; @@ -18,7 +18,7 @@ pub async fn serve( shutdown: impl Future, ) where I: Send + 'static, - A: Param>, + A: Param, M: svc::NewService, S: tower::Service, Response = ()> + Send + 'static, S::Error: Into, @@ -40,8 +40,8 @@ pub async fn serve( }; // The local addr should be instrumented from the listener's context. - let Remote(ClientAddr(client_addr)) = addrs.param(); - let span = debug_span!("accept", client.addr = %client_addr).entered(); + let AddrPair(client_addr, server_addr) = addrs.param(); + let span = debug_span!("accept", client.addr = %client_addr, server.addr = %server_addr).entered(); let accept = new_accept.new_service(addrs); // Dispatch all of the work for a given connection onto a @@ -57,10 +57,20 @@ pub async fn serve( { Ok(()) => debug!("Connection closed"), Err(reason) if is_caused_by::(&*reason) => { - debug!(%reason, "Connection closed") + debug!( + reason, + client.addr = %client_addr, + server.addr = %server_addr, + "Connection closed" + ); } Err(error) => { - info!(error, client.addr = %client_addr, "Connection closed") + info!( + error, + client.addr = %client_addr, + server.addr = %server_addr, + "Connection closed" + ); } } // Hold the service until the connection is complete. This diff --git a/linkerd/app/inbound/src/server.rs b/linkerd/app/inbound/src/server.rs index 3920e71cb..d9b18c3fa 100644 --- a/linkerd/app/inbound/src/server.rs +++ b/linkerd/app/inbound/src/server.rs @@ -5,7 +5,7 @@ use linkerd_app_core::{ io, profiles, proxy::http, serve, svc, - transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{self, addrs::*}, Error, Result, }; use std::{fmt::Debug, sync::Arc}; @@ -43,7 +43,10 @@ impl Inbound<()> { profiles: P, gateway: G, ) where - A: svc::Param> + svc::Param + Clone + Send + Sync + 'static, + A: svc::Param>, + A: svc::Param, + A: svc::Param, + A: Clone + Send + Sync + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Unpin + Send + Sync + 'static, G: svc::NewService, diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 8c9e7939e..44046e014 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -230,7 +230,7 @@ impl Outbound<()> { resolve: R, ) where // Target describing a server-side connection. - T: svc::Param>, + T: svc::Param, T: svc::Param, T: Clone + Send + Sync + 'static, // Server-side socket. diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 22bff9a62..70713ba0b 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -22,7 +22,7 @@ use linkerd_app_core::{ metrics::FmtMetrics, svc::Param, telemetry, - transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{addrs::*, listen::Bind}, Error, ProxyRuntime, }; use linkerd_app_gateway as gateway; @@ -102,11 +102,17 @@ impl Config { ) -> Result where BIn: Bind + 'static, - BIn::Addrs: Param> + Param> + Param, + BIn::Addrs: Param> + + Param> + + Param + + Param, BOut: Bind + 'static, - BOut::Addrs: Param> + Param> + Param, + BOut::Addrs: Param> + + Param> + + Param + + Param, BAdmin: Bind + Clone + 'static, - BAdmin::Addrs: Param> + Param>, + BAdmin::Addrs: Param> + Param> + Param, { let Config { admin, diff --git a/linkerd/app/src/tap.rs b/linkerd/app/src/tap.rs index 81f4beeff..880d0aeba 100644 --- a/linkerd/app/src/tap.rs +++ b/linkerd/app/src/tap.rs @@ -6,7 +6,7 @@ use linkerd_app_core::{ serve, svc::{self, ExtractParam, InsertParam, Param}, tls, - transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, + transport::{addrs::AddrPair, listen::Bind, ClientAddr, Local, Remote, ServerAddr}, Error, }; use std::{collections::HashSet, pin::Pin}; @@ -47,6 +47,7 @@ impl Config { where B: Bind, B::Addrs: Param>, + B::Addrs: Param, { let (registry, server) = tap::new(); match self { diff --git a/linkerd/proxy/transport/src/addrs.rs b/linkerd/proxy/transport/src/addrs.rs index 19bab4801..c9eca2f0b 100644 --- a/linkerd/proxy/transport/src/addrs.rs +++ b/linkerd/proxy/transport/src/addrs.rs @@ -28,6 +28,10 @@ pub struct Local(pub T); #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct Remote(pub T); +/// Describes a connection from a client to a server. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub struct AddrPair(pub ClientAddr, pub ServerAddr); + // === impl ClientAddr === impl std::ops::Deref for ClientAddr { diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index 0713942a6..c463ad84c 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -15,7 +15,7 @@ pub mod listen; pub mod orig_dst; pub use self::{ - addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, + addrs::{AddrPair, ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, connect::ConnectTcp, listen::{Bind, BindTcp}, orig_dst::BindWithOrigDst, diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index fd70ec57c..4addd907d 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -104,3 +104,12 @@ impl Param> for Addrs { self.server } } + +impl Param for Addrs { + #[inline] + fn param(&self) -> AddrPair { + let Remote(client) = self.client; + let Local(server) = self.server; + AddrPair(client, server) + } +} diff --git a/linkerd/proxy/transport/src/orig_dst.rs b/linkerd/proxy/transport/src/orig_dst.rs index bc2232c56..6b1a4c480 100644 --- a/linkerd/proxy/transport/src/orig_dst.rs +++ b/linkerd/proxy/transport/src/orig_dst.rs @@ -23,6 +23,7 @@ pub struct Addrs { // === impl Addrs === impl Param for Addrs { + #[inline] fn param(&self) -> OrigDstAddr { self.orig_dst } @@ -32,11 +33,23 @@ impl Param> for Addrs where A: Param>, { + #[inline] fn param(&self) -> Remote { self.inner.param() } } +impl Param for Addrs +where + A: Param>, +{ + #[inline] + fn param(&self) -> AddrPair { + let Remote(client) = self.inner.param(); + AddrPair(client, ServerAddr(self.orig_dst.into())) + } +} + impl Param> for Addrs where A: Param>,