Include server address in server error logs (#2500)

When the proxy's TCP server encounters an error (usually due to one of
the connections failing, we log the error and the client's address. The
server's address was omitted because it varies based on context that is
not known in this module: in some cases it's the actual server address
on the socket, but when proxying a connection it may be determined by
the value retrieved from the SO_ORIGINAL_DST socket option.

To fix this, the server now requires that connection metadata be able to
materialize an 'AddrPair' parameter that describes a client-server
connection. The TCP listener impls are updated to satisfy this based on
the appropriate metadata; and the TCP server consumes this type to
include both client and server addresses in the relevant logs/contexts.
This commit is contained in:
Oliver Gould 2023-11-03 10:30:25 -07:00 committed by GitHub
parent cbf226e8e5
commit 26b718c55d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 75 additions and 17 deletions

View File

@ -7,7 +7,9 @@ use linkerd_app_core::{
serve, serve,
svc::{self, ExtractParam, InsertParam, Param}, svc::{self, ExtractParam, InsertParam, Param},
tls, trace, tls, trace,
transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, transport::{
self, addrs::AddrPair, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr,
},
Error, Result, Error, Result,
}; };
use linkerd_app_inbound as inbound; use linkerd_app_inbound as inbound;
@ -84,7 +86,9 @@ impl Config {
where where
R: FmtMetrics + Clone + Send + Sync + Unpin + 'static, R: FmtMetrics + Clone + Send + Sync + Unpin + 'static,
B: Bind<ServerConfig>, B: Bind<ServerConfig>,
B::Addrs: svc::Param<Remote<ClientAddr>> + svc::Param<Local<ServerAddr>>, B::Addrs: svc::Param<Remote<ClientAddr>>,
B::Addrs: svc::Param<Local<ServerAddr>>,
B::Addrs: svc::Param<AddrPair>,
{ {
let (listen_addr, listen) = bind.bind(&self.server)?; let (listen_addr, listen) = bind.bind(&self.server)?;
@ -202,6 +206,14 @@ impl Param<Remote<ClientAddr>> for Http {
} }
} }
impl Param<AddrPair> for Http {
fn param(&self) -> AddrPair {
let Remote(client) = self.tcp.client;
let Local(server) = self.tcp.addr;
AddrPair(client, server)
}
}
impl Param<tls::ConditionalServerTls> for Http { impl Param<tls::ConditionalServerTls> for Http {
fn param(&self) -> tls::ConditionalServerTls { fn param(&self) -> tls::ConditionalServerTls {
self.tcp.tls.clone() self.tcp.tls.clone()

View File

@ -1,11 +1,11 @@
use crate::{ use crate::{
io, is_caused_by, io, is_caused_by,
svc::{self, Param}, svc::{self, Param},
transport::{ClientAddr, Remote},
Result, Result,
}; };
use futures::prelude::*; use futures::prelude::*;
use linkerd_error::Error; use linkerd_error::Error;
use linkerd_proxy_transport::AddrPair;
use tower::util::ServiceExt; use tower::util::ServiceExt;
use tracing::{debug, debug_span, info, instrument::Instrument, warn}; use tracing::{debug, debug_span, info, instrument::Instrument, warn};
@ -18,7 +18,7 @@ pub async fn serve<M, S, I, A>(
shutdown: impl Future, shutdown: impl Future,
) where ) where
I: Send + 'static, I: Send + 'static,
A: Param<Remote<ClientAddr>>, A: Param<AddrPair>,
M: svc::NewService<A, Service = S>, M: svc::NewService<A, Service = S>,
S: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static, S: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
S::Error: Into<Error>, S::Error: Into<Error>,
@ -40,8 +40,8 @@ pub async fn serve<M, S, I, A>(
}; };
// The local addr should be instrumented from the listener's context. // The local addr should be instrumented from the listener's context.
let Remote(ClientAddr(client_addr)) = addrs.param(); let AddrPair(client_addr, server_addr) = addrs.param();
let span = debug_span!("accept", client.addr = %client_addr).entered(); let span = debug_span!("accept", client.addr = %client_addr, server.addr = %server_addr).entered();
let accept = new_accept.new_service(addrs); let accept = new_accept.new_service(addrs);
// Dispatch all of the work for a given connection onto a // Dispatch all of the work for a given connection onto a
@ -57,10 +57,20 @@ pub async fn serve<M, S, I, A>(
{ {
Ok(()) => debug!("Connection closed"), Ok(()) => debug!("Connection closed"),
Err(reason) if is_caused_by::<std::io::Error>(&*reason) => { Err(reason) if is_caused_by::<std::io::Error>(&*reason) => {
debug!(%reason, "Connection closed") debug!(
reason,
client.addr = %client_addr,
server.addr = %server_addr,
"Connection closed"
);
} }
Err(error) => { Err(error) => {
info!(error, client.addr = %client_addr, "Connection closed") info!(
error,
client.addr = %client_addr,
server.addr = %server_addr,
"Connection closed"
);
} }
} }
// Hold the service until the connection is complete. This // Hold the service until the connection is complete. This

View File

@ -5,7 +5,7 @@ use linkerd_app_core::{
io, profiles, io, profiles,
proxy::http, proxy::http,
serve, svc, serve, svc,
transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, transport::{self, addrs::*},
Error, Result, Error, Result,
}; };
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
@ -43,7 +43,10 @@ impl Inbound<()> {
profiles: P, profiles: P,
gateway: G, gateway: G,
) where ) where
A: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr> + Clone + Send + Sync + 'static, A: svc::Param<Remote<ClientAddr>>,
A: svc::Param<OrigDstAddr>,
A: svc::Param<AddrPair>,
A: Clone + Send + Sync + 'static,
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
I: Debug + Unpin + Send + Sync + 'static, I: Debug + Unpin + Send + Sync + 'static,
G: svc::NewService<direct::GatewayTransportHeader, Service = GSvc>, G: svc::NewService<direct::GatewayTransportHeader, Service = GSvc>,

View File

@ -230,7 +230,7 @@ impl Outbound<()> {
resolve: R, resolve: R,
) where ) where
// Target describing a server-side connection. // Target describing a server-side connection.
T: svc::Param<Remote<ClientAddr>>, T: svc::Param<AddrPair>,
T: svc::Param<OrigDstAddr>, T: svc::Param<OrigDstAddr>,
T: Clone + Send + Sync + 'static, T: Clone + Send + Sync + 'static,
// Server-side socket. // Server-side socket.

View File

@ -22,7 +22,7 @@ use linkerd_app_core::{
metrics::FmtMetrics, metrics::FmtMetrics,
svc::Param, svc::Param,
telemetry, telemetry,
transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, transport::{addrs::*, listen::Bind},
Error, ProxyRuntime, Error, ProxyRuntime,
}; };
use linkerd_app_gateway as gateway; use linkerd_app_gateway as gateway;
@ -102,11 +102,17 @@ impl Config {
) -> Result<App, Error> ) -> Result<App, Error>
where where
BIn: Bind<ServerConfig> + 'static, BIn: Bind<ServerConfig> + 'static,
BIn::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>, BIn::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BOut: Bind<ServerConfig> + 'static, BOut: Bind<ServerConfig> + 'static,
BOut::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>, BOut::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BAdmin: Bind<ServerConfig> + Clone + 'static, BAdmin: Bind<ServerConfig> + Clone + 'static,
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>>, BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<AddrPair>,
{ {
let Config { let Config {
admin, admin,

View File

@ -6,7 +6,7 @@ use linkerd_app_core::{
serve, serve,
svc::{self, ExtractParam, InsertParam, Param}, svc::{self, ExtractParam, InsertParam, Param},
tls, tls,
transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, transport::{addrs::AddrPair, listen::Bind, ClientAddr, Local, Remote, ServerAddr},
Error, Error,
}; };
use std::{collections::HashSet, pin::Pin}; use std::{collections::HashSet, pin::Pin};
@ -47,6 +47,7 @@ impl Config {
where where
B: Bind<ServerConfig>, B: Bind<ServerConfig>,
B::Addrs: Param<Remote<ClientAddr>>, B::Addrs: Param<Remote<ClientAddr>>,
B::Addrs: Param<AddrPair>,
{ {
let (registry, server) = tap::new(); let (registry, server) = tap::new();
match self { match self {

View File

@ -28,6 +28,10 @@ pub struct Local<T>(pub T);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Remote<T>(pub T); pub struct Remote<T>(pub T);
/// Describes a connection from a client to a server.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct AddrPair(pub ClientAddr, pub ServerAddr);
// === impl ClientAddr === // === impl ClientAddr ===
impl std::ops::Deref for ClientAddr { impl std::ops::Deref for ClientAddr {

View File

@ -15,7 +15,7 @@ pub mod listen;
pub mod orig_dst; pub mod orig_dst;
pub use self::{ pub use self::{
addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, addrs::{AddrPair, ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr},
connect::ConnectTcp, connect::ConnectTcp,
listen::{Bind, BindTcp}, listen::{Bind, BindTcp},
orig_dst::BindWithOrigDst, orig_dst::BindWithOrigDst,

View File

@ -104,3 +104,12 @@ impl Param<Local<ServerAddr>> for Addrs {
self.server self.server
} }
} }
impl Param<AddrPair> for Addrs {
#[inline]
fn param(&self) -> AddrPair {
let Remote(client) = self.client;
let Local(server) = self.server;
AddrPair(client, server)
}
}

View File

@ -23,6 +23,7 @@ pub struct Addrs<A = listen::Addrs> {
// === impl Addrs === // === impl Addrs ===
impl<A> Param<OrigDstAddr> for Addrs<A> { impl<A> Param<OrigDstAddr> for Addrs<A> {
#[inline]
fn param(&self) -> OrigDstAddr { fn param(&self) -> OrigDstAddr {
self.orig_dst self.orig_dst
} }
@ -32,11 +33,23 @@ impl<A> Param<Remote<ClientAddr>> for Addrs<A>
where where
A: Param<Remote<ClientAddr>>, A: Param<Remote<ClientAddr>>,
{ {
#[inline]
fn param(&self) -> Remote<ClientAddr> { fn param(&self) -> Remote<ClientAddr> {
self.inner.param() self.inner.param()
} }
} }
impl<A> Param<AddrPair> for Addrs<A>
where
A: Param<Remote<ClientAddr>>,
{
#[inline]
fn param(&self) -> AddrPair {
let Remote(client) = self.inner.param();
AddrPair(client, ServerAddr(self.orig_dst.into()))
}
}
impl<A> Param<Local<ServerAddr>> for Addrs<A> impl<A> Param<Local<ServerAddr>> for Addrs<A>
where where
A: Param<Local<ServerAddr>>, A: Param<Local<ServerAddr>>,