wip: app/core, io, meshtls, proxy/transport, tls
This commit is contained in:
		
							parent
							
								
									1bddc61fba
								
							
						
					
					
						commit
						74b4935271
					
				
							
								
								
									
										11
									
								
								Cargo.lock
								
								
								
								
							
							
						
						
									
										11
									
								
								Cargo.lock
								
								
								
								
							| 
						 | 
				
			
			@ -1407,6 +1407,7 @@ dependencies = [
 | 
			
		|||
 "http 1.2.0",
 | 
			
		||||
 "http-body",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "hyper-util",
 | 
			
		||||
 "ipnet",
 | 
			
		||||
 "linkerd-addr",
 | 
			
		||||
 "linkerd-conditional",
 | 
			
		||||
| 
						 | 
				
			
			@ -1994,6 +1995,7 @@ dependencies = [
 | 
			
		|||
 "async-trait",
 | 
			
		||||
 "bytes",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "hyper-util",
 | 
			
		||||
 "linkerd-errno",
 | 
			
		||||
 "pin-project",
 | 
			
		||||
| 
						 | 
				
			
			@ -2007,6 +2009,8 @@ name = "linkerd-meshtls"
 | 
			
		|||
version = "0.1.0"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "futures",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "hyper-util",
 | 
			
		||||
 "linkerd-conditional",
 | 
			
		||||
 "linkerd-dns-name",
 | 
			
		||||
 "linkerd-error",
 | 
			
		||||
| 
						 | 
				
			
			@ -2032,6 +2036,8 @@ dependencies = [
 | 
			
		|||
 "boring",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "hex",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "hyper-util",
 | 
			
		||||
 "linkerd-dns-name",
 | 
			
		||||
 "linkerd-error",
 | 
			
		||||
 "linkerd-identity",
 | 
			
		||||
| 
						 | 
				
			
			@ -2051,6 +2057,8 @@ name = "linkerd-meshtls-rustls"
 | 
			
		|||
version = "0.1.0"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "futures",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "hyper-util",
 | 
			
		||||
 "linkerd-dns-name",
 | 
			
		||||
 "linkerd-error",
 | 
			
		||||
 "linkerd-identity",
 | 
			
		||||
| 
						 | 
				
			
			@ -2485,6 +2493,8 @@ name = "linkerd-proxy-transport"
 | 
			
		|||
version = "0.1.0"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "futures",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "hyper-util",
 | 
			
		||||
 "libc",
 | 
			
		||||
 "linkerd-error",
 | 
			
		||||
 "linkerd-io",
 | 
			
		||||
| 
						 | 
				
			
			@ -2632,6 +2642,7 @@ dependencies = [
 | 
			
		|||
 "async-trait",
 | 
			
		||||
 "bytes",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "hyper",
 | 
			
		||||
 "linkerd-conditional",
 | 
			
		||||
 "linkerd-dns-name",
 | 
			
		||||
 "linkerd-error",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,6 +18,7 @@ drain = { version = "0.1", features = ["retain"] }
 | 
			
		|||
http = { workspace = true }
 | 
			
		||||
http-body = { workspace = true }
 | 
			
		||||
hyper = { workspace = true, features = ["http1", "http2"] }
 | 
			
		||||
hyper-util = { workspace = true }
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
ipnet = "2.11"
 | 
			
		||||
prometheus-client = "0.22"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -69,8 +69,10 @@ impl fmt::Display for ControlAddr {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub type RspBody =
 | 
			
		||||
    linkerd_http_metrics::requests::ResponseBody<http::balance::Body<hyper::Body>, classify::Eos>;
 | 
			
		||||
pub type RspBody = linkerd_http_metrics::requests::ResponseBody<
 | 
			
		||||
    http::balance::Body<hyper::body::Incoming>,
 | 
			
		||||
    classify::Eos,
 | 
			
		||||
>;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, Default)]
 | 
			
		||||
pub struct Metrics {
 | 
			
		||||
| 
						 | 
				
			
			@ -112,7 +114,7 @@ impl Config {
 | 
			
		|||
                warn!(error, "Failed to resolve control-plane component");
 | 
			
		||||
                if let Some(e) = crate::errors::cause_ref::<dns::ResolveError>(&*error) {
 | 
			
		||||
                    if let Some(ttl) = e.negative_ttl() {
 | 
			
		||||
                        return Ok(Either::Left(
 | 
			
		||||
                        return Ok::<_, Error>(Either::Left(
 | 
			
		||||
                            IntervalStream::new(time::interval(ttl)).map(|_| ()),
 | 
			
		||||
                        ));
 | 
			
		||||
                    }
 | 
			
		||||
| 
						 | 
				
			
			@ -129,9 +131,9 @@ impl Config {
 | 
			
		|||
            self.connect.user_timeout,
 | 
			
		||||
        ))
 | 
			
		||||
        .push(tls::Client::layer(identity))
 | 
			
		||||
        .push_connect_timeout(self.connect.timeout)
 | 
			
		||||
        .push_connect_timeout(self.connect.timeout) // Client<NewClient, ConnectTcp>
 | 
			
		||||
        .push_map_target(|(_version, target)| target)
 | 
			
		||||
        .push(self::client::layer(self.connect.http2))
 | 
			
		||||
        .push(self::client::layer::<_, _>(self.connect.http2))
 | 
			
		||||
        .push_on_service(svc::MapErr::layer_boxed())
 | 
			
		||||
        .into_new_service();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,6 +3,7 @@ use super::{
 | 
			
		|||
    respond::{HttpRescue, SyntheticHttpResponse},
 | 
			
		||||
};
 | 
			
		||||
use http::{header::HeaderValue, HeaderMap};
 | 
			
		||||
use http_body::Frame;
 | 
			
		||||
use linkerd_error::{Error, Result};
 | 
			
		||||
use pin_project::pin_project;
 | 
			
		||||
use std::{
 | 
			
		||||
| 
						 | 
				
			
			@ -66,19 +67,18 @@ where
 | 
			
		|||
    type Data = B::Data;
 | 
			
		||||
    type Error = B::Error;
 | 
			
		||||
 | 
			
		||||
    fn poll_data(
 | 
			
		||||
    fn poll_frame(
 | 
			
		||||
        mut self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
 | 
			
		||||
    ) -> Poll<Option<std::result::Result<http_body::Frame<Self::Data>, Self::Error>>> {
 | 
			
		||||
        let ResponseBodyProj(inner) = self.as_mut().project();
 | 
			
		||||
        match inner.project() {
 | 
			
		||||
            InnerProj::Passthru(inner) => inner.poll_data(cx),
 | 
			
		||||
            InnerProj::Rescued { trailers: _ } => Poll::Ready(None),
 | 
			
		||||
            InnerProj::Passthru(inner) => inner.poll_frame(cx),
 | 
			
		||||
            InnerProj::GrpcRescue {
 | 
			
		||||
                inner,
 | 
			
		||||
                rescue,
 | 
			
		||||
                emit_headers,
 | 
			
		||||
            } => match inner.poll_data(cx) {
 | 
			
		||||
            } => match inner.poll_frame(cx) {
 | 
			
		||||
                Poll::Ready(Some(Err(error))) => {
 | 
			
		||||
                    // The inner body has yielded an error, which we will try to rescue. If so,
 | 
			
		||||
                    // store our synthetic trailers reporting the error.
 | 
			
		||||
| 
						 | 
				
			
			@ -88,19 +88,10 @@ where
 | 
			
		|||
                }
 | 
			
		||||
                data => data,
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_trailers(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
 | 
			
		||||
        let ResponseBodyProj(inner) = self.project();
 | 
			
		||||
        match inner.project() {
 | 
			
		||||
            InnerProj::Passthru(inner) => inner.poll_trailers(cx),
 | 
			
		||||
            InnerProj::GrpcRescue { inner, .. } => inner.poll_trailers(cx),
 | 
			
		||||
            InnerProj::Rescued { trailers } => Poll::Ready(Ok(trailers.take())),
 | 
			
		||||
            InnerProj::Rescued { trailers } => {
 | 
			
		||||
                let trailers = trailers.take().map(Frame::trailers).map(Ok);
 | 
			
		||||
                Poll::Ready(trailers)
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,6 +16,7 @@ default = []
 | 
			
		|||
async-trait = "0.1"
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
bytes = { workspace = true }
 | 
			
		||||
hyper = { workspace = true, default-features = false }
 | 
			
		||||
hyper-util = { workspace = true, features = ["tokio"] }
 | 
			
		||||
linkerd-errno = { path = "../errno" }
 | 
			
		||||
tokio = { version = "1", features = ["io-util", "net"] }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,6 +47,19 @@ impl<L: io::AsyncRead, R: io::AsyncRead> io::AsyncRead for EitherIo<L, R> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<L: hyper::rt::Read, R: hyper::rt::Read> hyper::rt::Read for EitherIo<L, R> {
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
    ) -> io::Poll<()> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            EitherIoProj::Left(l) => l.poll_read(cx, buf),
 | 
			
		||||
            EitherIoProj::Right(r) => r.poll_read(cx, buf),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<L: io::AsyncWrite, R: io::AsyncWrite> io::AsyncWrite for EitherIo<L, R> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
| 
						 | 
				
			
			@ -92,3 +105,44 @@ impl<L: io::AsyncWrite, R: io::AsyncWrite> io::AsyncWrite for EitherIo<L, R> {
 | 
			
		|||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<L: hyper::rt::Write, R: hyper::rt::Write> hyper::rt::Write for EitherIo<L, R> {
 | 
			
		||||
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            EitherIoProj::Left(l) => l.poll_write(cx, buf),
 | 
			
		||||
            EitherIoProj::Right(r) => r.poll_write(cx, buf),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            EitherIoProj::Left(l) => l.poll_flush(cx),
 | 
			
		||||
            EitherIoProj::Right(r) => r.poll_flush(cx),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            EitherIoProj::Left(l) => l.poll_shutdown(cx),
 | 
			
		||||
            EitherIoProj::Right(r) => r.poll_shutdown(cx),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        match self {
 | 
			
		||||
            EitherIo::Left(l) => l.is_write_vectored(),
 | 
			
		||||
            EitherIo::Right(r) => r.is_write_vectored(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_write_vectored(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
    ) -> io::Poll<usize> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            EitherIoProj::Left(l) => l.poll_write_vectored(cx, bufs),
 | 
			
		||||
            EitherIoProj::Right(r) => r.poll_write_vectored(cx, bufs),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -78,6 +78,35 @@ impl<I: io::AsyncRead> io::AsyncRead for PrefixedIo<I> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: hyper::rt::Read> hyper::rt::Read for PrefixedIo<I> {
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        mut buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
    ) -> io::Poll<()> {
 | 
			
		||||
        // XXX(kate): this is copy-pasted from `io::AsyncRead`, above.
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        // Check the length only once, since looking as the length
 | 
			
		||||
        // of a Bytes isn't as cheap as the length of a &[u8].
 | 
			
		||||
        let peeked_len = this.prefix.len();
 | 
			
		||||
 | 
			
		||||
        if peeked_len == 0 {
 | 
			
		||||
            this.io.poll_read(cx, buf)
 | 
			
		||||
        } else {
 | 
			
		||||
            let len = cmp::min(buf.remaining(), peeked_len);
 | 
			
		||||
            buf.put_slice(&this.prefix.as_ref()[..len]);
 | 
			
		||||
            this.prefix.advance(len);
 | 
			
		||||
            // If we've finally emptied the prefix, drop it so we don't
 | 
			
		||||
            // hold onto the allocated memory any longer. We won't peek
 | 
			
		||||
            // again.
 | 
			
		||||
            if peeked_len == len {
 | 
			
		||||
                *this.prefix = Bytes::new();
 | 
			
		||||
            }
 | 
			
		||||
            io::Poll::Ready(Ok(()))
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::Write> io::Write for PrefixedIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
 | 
			
		||||
| 
						 | 
				
			
			@ -120,3 +149,29 @@ impl<I: io::AsyncWrite> io::AsyncWrite for PrefixedIo<I> {
 | 
			
		|||
        self.io.is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: hyper::rt::Write> hyper::rt::Write for PrefixedIo<I> {
 | 
			
		||||
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
 | 
			
		||||
        self.project().io.poll_write(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        self.project().io.poll_flush(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        self.project().io.poll_shutdown(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        self.io.is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_write_vectored(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
    ) -> io::Poll<usize> {
 | 
			
		||||
        self.project().io.poll_write_vectored(cx, bufs)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -89,6 +89,17 @@ impl<I: io::AsyncRead> io::AsyncRead for ScopedIo<I> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: hyper::rt::Read> hyper::rt::Read for ScopedIo<I> {
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
    ) -> io::Poll<()> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.io.poll_read(cx, buf).map_err(this.scope.err())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::Write> io::Write for ScopedIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
 | 
			
		||||
| 
						 | 
				
			
			@ -138,3 +149,38 @@ impl<I: io::AsyncWrite> io::AsyncWrite for ScopedIo<I> {
 | 
			
		|||
        self.io.is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: hyper::rt::Write> hyper::rt::Write for ScopedIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.io.poll_write(cx, buf).map_err(this.scope.err())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.io.poll_flush(cx).map_err(this.scope.err())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.io.poll_shutdown(cx).map_err(this.scope.err())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        self.io.is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_write_vectored(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
    ) -> io::Poll<usize> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.io
 | 
			
		||||
            .poll_write_vectored(cx, bufs)
 | 
			
		||||
            .map_err(this.scope.err())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -77,6 +77,42 @@ impl<T: AsyncRead + AsyncWrite, S: Sensor> AsyncWrite for SensorIo<T, S> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: hyper::rt::Write, S: Sensor> hyper::rt::Write for SensorIo<T, S> {
 | 
			
		||||
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.sensor.record_error(this.io.poll_shutdown(cx))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        this.sensor.record_error(this.io.poll_flush(cx))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<usize> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        let bytes = ready!(this.sensor.record_error(this.io.poll_write(cx, buf)))?;
 | 
			
		||||
        this.sensor.record_write(bytes);
 | 
			
		||||
        Poll::Ready(Ok(bytes))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_write_vectored(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
    ) -> Poll<usize> {
 | 
			
		||||
        let this = self.project();
 | 
			
		||||
        let bytes = ready!(this
 | 
			
		||||
            .sensor
 | 
			
		||||
            .record_error(this.io.poll_write_vectored(cx, bufs)))?;
 | 
			
		||||
        this.sensor.record_write(bytes);
 | 
			
		||||
        Poll::Ready(Ok(bytes))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        self.io.is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: PeerAddr, S> PeerAddr for SensorIo<T, S> {
 | 
			
		||||
    fn peer_addr(&self) -> Result<std::net::SocketAddr> {
 | 
			
		||||
        self.io.peer_addr()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,7 +15,10 @@ __has_any_tls_impls = []
 | 
			
		|||
 | 
			
		||||
[dependencies]
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
hyper = { workspace = true }
 | 
			
		||||
hyper-util = { workspace = true }
 | 
			
		||||
pin-project = "1"
 | 
			
		||||
tokio = { version = "1", default-features = false }
 | 
			
		||||
 | 
			
		||||
linkerd-dns-name = { path = "../dns/name" }
 | 
			
		||||
linkerd-error = { path = "../error" }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,6 +10,8 @@ publish = false
 | 
			
		|||
boring = "4"
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
hex = "0.4"                                             # used for debug logging
 | 
			
		||||
hyper = { workspace = true }
 | 
			
		||||
hyper-util = { workspace = true }
 | 
			
		||||
linkerd-error = { path = "../../error" }
 | 
			
		||||
linkerd-dns-name = { path = "../../dns/name" }
 | 
			
		||||
linkerd-identity = { path = "../../identity" }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,7 +21,7 @@ pub struct Connect {
 | 
			
		|||
pub type ConnectFuture<I> = Pin<Box<dyn Future<Output = io::Result<ClientIo<I>>> + Send>>;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub struct ClientIo<I>(tokio_boring::SslStream<I>);
 | 
			
		||||
pub struct ClientIo<I>(hyper_util::rt::TokioIo<tokio_boring::SslStream<I>>);
 | 
			
		||||
 | 
			
		||||
// === impl NewClient ===
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -117,7 +117,7 @@ where
 | 
			
		|||
                "Initiated TLS connection"
 | 
			
		||||
            );
 | 
			
		||||
            trace!(peer.id = %server_id, peer.name = %server_name);
 | 
			
		||||
            Ok(ClientIo(io))
 | 
			
		||||
            Ok(ClientIo(hyper_util::rt::TokioIo::new(io)))
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -131,6 +131,16 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
 | 
			
		|||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: &mut io::ReadBuf<'_>,
 | 
			
		||||
    ) -> io::Poll<()> {
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_read(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Read for ClientIo<I> {
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        mut self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
    ) -> std::task::Poll<Result<(), std::io::Error>> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_read(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -138,17 +148,17 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
 | 
			
		|||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_flush(cx)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_flush(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_shutdown(cx)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_shutdown(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_write(cx, buf)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_write(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
| 
						 | 
				
			
			@ -157,12 +167,12 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
 | 
			
		|||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[io::IoSlice<'_>],
 | 
			
		||||
    ) -> io::Poll<usize> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_write_vectored(cx, bufs)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        self.0.is_write_vectored()
 | 
			
		||||
        self.0.inner().is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -170,6 +180,7 @@ impl<I> ClientIo<I> {
 | 
			
		|||
    #[inline]
 | 
			
		||||
    pub fn negotiated_protocol(&self) -> Option<NegotiatedProtocolRef<'_>> {
 | 
			
		||||
        self.0
 | 
			
		||||
            .inner()
 | 
			
		||||
            .ssl()
 | 
			
		||||
            .selected_alpn_protocol()
 | 
			
		||||
            .map(NegotiatedProtocolRef)
 | 
			
		||||
| 
						 | 
				
			
			@ -179,6 +190,6 @@ impl<I> ClientIo<I> {
 | 
			
		|||
impl<I: io::PeerAddr> io::PeerAddr for ClientIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
 | 
			
		||||
        self.0.get_ref().peer_addr()
 | 
			
		||||
        self.0.inner().get_ref().peer_addr()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,6 +11,8 @@ test-util = ["linkerd-tls-test-util"]
 | 
			
		|||
 | 
			
		||||
[dependencies]
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
hyper = { workspace = true }
 | 
			
		||||
hyper-util = { workspace = true }
 | 
			
		||||
ring = { version = "0.17", features = ["std"] }
 | 
			
		||||
rustls-pemfile = "2.2"
 | 
			
		||||
rustls-webpki = { version = "0.102.8", features = ["std"] }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,10 @@ use linkerd_identity as id;
 | 
			
		|||
use linkerd_io as io;
 | 
			
		||||
use linkerd_meshtls_verifier as verifier;
 | 
			
		||||
use linkerd_stack::{NewService, Service};
 | 
			
		||||
use linkerd_tls::{client::AlpnProtocols, ClientTls, NegotiatedProtocolRef};
 | 
			
		||||
use linkerd_tls::{
 | 
			
		||||
    client::{self, AlpnProtocols},
 | 
			
		||||
    ClientTls, NegotiatedProtocolRef,
 | 
			
		||||
};
 | 
			
		||||
use std::{convert::TryFrom, pin::Pin, sync::Arc, task::Context};
 | 
			
		||||
use tokio::sync::watch;
 | 
			
		||||
use tokio_rustls::rustls::{self, pki_types::CertificateDer, ClientConfig};
 | 
			
		||||
| 
						 | 
				
			
			@ -25,7 +28,7 @@ pub struct Connect {
 | 
			
		|||
pub type ConnectFuture<I> = Pin<Box<dyn Future<Output = io::Result<ClientIo<I>>> + Send>>;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub struct ClientIo<I>(tokio_rustls::client::TlsStream<I>);
 | 
			
		||||
pub struct ClientIo<I>(hyper_util::rt::TokioIo<tokio_rustls::client::TlsStream<I>>);
 | 
			
		||||
 | 
			
		||||
// === impl NewClient ===
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -115,7 +118,7 @@ where
 | 
			
		|||
                    let (_, conn) = s.get_ref();
 | 
			
		||||
                    let end_cert = extract_cert(conn)?;
 | 
			
		||||
                    verifier::verify_id(end_cert, &server_id)?;
 | 
			
		||||
                    Ok(ClientIo(s))
 | 
			
		||||
                    Ok(ClientIo(hyper_util::rt::TokioIo::new(s)))
 | 
			
		||||
                }),
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -130,6 +133,16 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
 | 
			
		|||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: &mut io::ReadBuf<'_>,
 | 
			
		||||
    ) -> io::Poll<()> {
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_read(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Read for ClientIo<I> {
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        mut self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
    ) -> std::task::Poll<Result<(), std::io::Error>> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_read(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -137,17 +150,17 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
 | 
			
		|||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_flush(cx)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_flush(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_shutdown(cx)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_shutdown(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_write(cx, buf)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_write(cx, buf)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
| 
						 | 
				
			
			@ -156,12 +169,12 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
 | 
			
		|||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[io::IoSlice<'_>],
 | 
			
		||||
    ) -> io::Poll<usize> {
 | 
			
		||||
        Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
 | 
			
		||||
        Pin::new(self.0.inner_mut()).poll_write_vectored(cx, bufs)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        self.0.is_write_vectored()
 | 
			
		||||
        self.0.inner().is_write_vectored()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -169,6 +182,7 @@ impl<I> ClientIo<I> {
 | 
			
		|||
    #[inline]
 | 
			
		||||
    pub fn negotiated_protocol(&self) -> Option<NegotiatedProtocolRef<'_>> {
 | 
			
		||||
        self.0
 | 
			
		||||
            .inner()
 | 
			
		||||
            .get_ref()
 | 
			
		||||
            .1
 | 
			
		||||
            .alpn_protocol()
 | 
			
		||||
| 
						 | 
				
			
			@ -179,6 +193,6 @@ impl<I> ClientIo<I> {
 | 
			
		|||
impl<I: io::PeerAddr> io::PeerAddr for ClientIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
 | 
			
		||||
        self.0.get_ref().0.peer_addr()
 | 
			
		||||
        self.0.inner().get_ref().0.peer_addr()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -180,6 +180,23 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Read for ClientIo<I> {
 | 
			
		||||
    fn poll_read(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
    ) -> io::Poll<()> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            #[cfg(feature = "boring")]
 | 
			
		||||
            ClientIoProj::Boring(io) => io.poll_read(cx, buf),
 | 
			
		||||
            #[cfg(feature = "rustls")]
 | 
			
		||||
            ClientIoProj::Rustls(io) => io.poll_read(cx, buf),
 | 
			
		||||
            #[cfg(not(feature = "__has_any_tls_impls"))]
 | 
			
		||||
            _ => crate::no_tls!(cx, buf),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
 | 
			
		||||
| 
						 | 
				
			
			@ -251,6 +268,60 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Write for ClientIo<I> {
 | 
			
		||||
    fn poll_write(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        buf: &[u8],
 | 
			
		||||
    ) -> Poll<Result<usize, std::io::Error>> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            #[cfg(feature = "boring")]
 | 
			
		||||
            ClientIoProj::Boring(io) => tokio::io::AsyncWrite::poll_write(io, cx, buf),
 | 
			
		||||
            #[cfg(feature = "rustls")]
 | 
			
		||||
            ClientIoProj::Rustls(io) => tokio::io::AsyncWrite::poll_write(io, cx, buf),
 | 
			
		||||
            #[cfg(not(feature = "__has_any_tls_impls"))]
 | 
			
		||||
            _ => crate::no_tls!(cx, buf),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            #[cfg(feature = "boring")]
 | 
			
		||||
            ClientIoProj::Boring(io) => tokio::io::AsyncWrite::poll_flush(io, cx),
 | 
			
		||||
            #[cfg(feature = "rustls")]
 | 
			
		||||
            ClientIoProj::Rustls(io) => tokio::io::AsyncWrite::poll_flush(io, cx),
 | 
			
		||||
            #[cfg(not(feature = "__has_any_tls_impls"))]
 | 
			
		||||
            _ => crate::no_tls!(cx),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_shutdown(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
        match self.project() {
 | 
			
		||||
            #[cfg(feature = "boring")]
 | 
			
		||||
            ClientIoProj::Boring(io) => tokio::io::AsyncWrite::poll_shutdown(io, cx),
 | 
			
		||||
            #[cfg(feature = "rustls")]
 | 
			
		||||
            ClientIoProj::Rustls(io) => tokio::io::AsyncWrite::poll_shutdown(io, cx),
 | 
			
		||||
            #[cfg(not(feature = "__has_any_tls_impls"))]
 | 
			
		||||
            _ => crate::no_tls!(cx),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn is_write_vectored(&self) -> bool {
 | 
			
		||||
        tokio::io::AsyncWrite::is_write_vectored(self)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn poll_write_vectored(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
        bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
    ) -> Poll<Result<usize, std::io::Error>> {
 | 
			
		||||
        tokio::io::AsyncWrite::poll_write_vectored(self, cx, bufs)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<I: io::PeerAddr> io::PeerAddr for ClientIo<I> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,6 +11,8 @@ Transport-level implementations that rely on core proxy infrastructure
 | 
			
		|||
 | 
			
		||||
[dependencies]
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
hyper = { workspace = true }
 | 
			
		||||
hyper-util = { workspace = true }
 | 
			
		||||
linkerd-error = { path = "../../error" }
 | 
			
		||||
linkerd-io = { path = "../../io" }
 | 
			
		||||
linkerd-stack = { path = "../../stack" }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,7 +6,6 @@ use std::{
 | 
			
		|||
    pin::Pin,
 | 
			
		||||
    task::{Context, Poll},
 | 
			
		||||
};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
use tracing::debug;
 | 
			
		||||
 | 
			
		||||
#[derive(Copy, Clone, Debug)]
 | 
			
		||||
| 
						 | 
				
			
			@ -25,7 +24,7 @@ impl ConnectTcp {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl<T: Param<Remote<ServerAddr>>> Service<T> for ConnectTcp {
 | 
			
		||||
    type Response = (io::ScopedIo<TcpStream>, Local<ClientAddr>);
 | 
			
		||||
    type Response = (io::ScopedIo<self::net::TcpStream>, Local<ClientAddr>);
 | 
			
		||||
    type Error = io::Error;
 | 
			
		||||
    type Future = Pin<Box<dyn Future<Output = io::Result<Self::Response>> + Send + Sync + 'static>>;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -39,7 +38,7 @@ impl<T: Param<Remote<ServerAddr>>> Service<T> for ConnectTcp {
 | 
			
		|||
        let Remote(ServerAddr(addr)) = t.param();
 | 
			
		||||
        debug!(server.addr = %addr, "Connecting");
 | 
			
		||||
        Box::pin(async move {
 | 
			
		||||
            let io = TcpStream::connect(&addr).await?;
 | 
			
		||||
            let io = tokio::net::TcpStream::connect(&addr).await?;
 | 
			
		||||
            super::set_nodelay_or_warn(&io);
 | 
			
		||||
            let io = super::set_keepalive_or_warn(io, keepalive)?;
 | 
			
		||||
            let io = super::set_user_timeout_or_warn(io, user_timeout)?;
 | 
			
		||||
| 
						 | 
				
			
			@ -49,7 +48,126 @@ impl<T: Param<Remote<ServerAddr>>> Service<T> for ConnectTcp {
 | 
			
		|||
                ?keepalive,
 | 
			
		||||
                "Connected",
 | 
			
		||||
            );
 | 
			
		||||
            Ok((io::ScopedIo::client(io), Local(ClientAddr(local_addr))))
 | 
			
		||||
            Ok((
 | 
			
		||||
                io::ScopedIo::client(self::net::TcpStream(io)),
 | 
			
		||||
                Local(ClientAddr(local_addr)),
 | 
			
		||||
            ))
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
mod net {
 | 
			
		||||
    use super::*;
 | 
			
		||||
 | 
			
		||||
    /// A wrapper that implements Tokio's IO traits for an inner type that
 | 
			
		||||
    /// implements hyper's IO traits, or vice versa (implements hyper's IO
 | 
			
		||||
    /// traits for a type that implements Tokio's IO traits).
 | 
			
		||||
    #[derive(Debug)]
 | 
			
		||||
    pub struct TcpStream(pub tokio::net::TcpStream);
 | 
			
		||||
 | 
			
		||||
    impl TcpStream {
 | 
			
		||||
        fn project(self: Pin<&mut Self>) -> Pin<&mut tokio::net::TcpStream> {
 | 
			
		||||
            let Self(stream) = self.get_mut();
 | 
			
		||||
            Pin::new(stream)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    impl tokio::io::AsyncRead for TcpStream {
 | 
			
		||||
        fn poll_read(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
            buf: &mut linkerd_io::ReadBuf<'_>,
 | 
			
		||||
        ) -> Poll<std::io::Result<()>> {
 | 
			
		||||
            self.project().poll_read(cx, buf)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    impl tokio::io::AsyncWrite for TcpStream {
 | 
			
		||||
        fn poll_write(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
            buf: &[u8],
 | 
			
		||||
        ) -> Poll<Result<usize, std::io::Error>> {
 | 
			
		||||
            self.project().poll_write(cx, buf)
 | 
			
		||||
        }
 | 
			
		||||
        fn poll_flush(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
        ) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
            self.project().poll_flush(cx)
 | 
			
		||||
        }
 | 
			
		||||
        fn poll_shutdown(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
        ) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
            self.project().poll_shutdown(cx)
 | 
			
		||||
        }
 | 
			
		||||
        fn poll_write_vectored(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
            bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
        ) -> Poll<Result<usize, std::io::Error>> {
 | 
			
		||||
            self.project().poll_write_vectored(cx, bufs)
 | 
			
		||||
        }
 | 
			
		||||
        fn is_write_vectored(&self) -> bool {
 | 
			
		||||
            self.0.is_write_vectored()
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    impl hyper::rt::Read for TcpStream {
 | 
			
		||||
        fn poll_read(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
            mut buf: hyper::rt::ReadBufCursor<'_>,
 | 
			
		||||
        ) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
            let n = unsafe {
 | 
			
		||||
                let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
 | 
			
		||||
                match tokio::io::AsyncRead::poll_read(self.project(), cx, &mut tbuf) {
 | 
			
		||||
                    Poll::Ready(Ok(())) => tbuf.filled().len(),
 | 
			
		||||
                    other => return other,
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            unsafe {
 | 
			
		||||
                buf.advance(n);
 | 
			
		||||
            }
 | 
			
		||||
            Poll::Ready(Ok(()))
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    impl hyper::rt::Write for TcpStream {
 | 
			
		||||
        fn poll_write(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
            buf: &[u8],
 | 
			
		||||
        ) -> Poll<Result<usize, std::io::Error>> {
 | 
			
		||||
            tokio::io::AsyncWrite::poll_write(self.project(), cx, buf)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        fn poll_flush(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
        ) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
            tokio::io::AsyncWrite::poll_flush(self.project(), cx)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        fn poll_shutdown(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
        ) -> Poll<Result<(), std::io::Error>> {
 | 
			
		||||
            tokio::io::AsyncWrite::poll_shutdown(self.project(), cx)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        fn is_write_vectored(&self) -> bool {
 | 
			
		||||
            tokio::io::AsyncWrite::is_write_vectored(&self.0)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        fn poll_write_vectored(
 | 
			
		||||
            self: Pin<&mut Self>,
 | 
			
		||||
            cx: &mut Context<'_>,
 | 
			
		||||
            bufs: &[std::io::IoSlice<'_>],
 | 
			
		||||
        ) -> Poll<Result<usize, std::io::Error>> {
 | 
			
		||||
            tokio::io::AsyncWrite::poll_write_vectored(self.project(), cx, bufs)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,12 +2,8 @@
 | 
			
		|||
//!
 | 
			
		||||
//! Uses unsafe code to interact with socket options for SO_ORIGINAL_DST.
 | 
			
		||||
 | 
			
		||||
#![deny(
 | 
			
		||||
    rust_2018_idioms,
 | 
			
		||||
    clippy::disallowed_methods,
 | 
			
		||||
    clippy::disallowed_types,
 | 
			
		||||
    unsafe_code
 | 
			
		||||
)]
 | 
			
		||||
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
 | 
			
		||||
// diabled, temporarily: unsafe_code
 | 
			
		||||
 | 
			
		||||
pub mod addrs;
 | 
			
		||||
mod connect;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,6 +10,7 @@ publish = false
 | 
			
		|||
async-trait = "0.1"
 | 
			
		||||
bytes = { workspace = true }
 | 
			
		||||
futures = { version = "0.3", default-features = false }
 | 
			
		||||
hyper = { workspace = true }
 | 
			
		||||
linkerd-conditional = { path = "../conditional" }
 | 
			
		||||
linkerd-dns-name = { path = "../dns/name" }
 | 
			
		||||
linkerd-error = { path = "../error" }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue