mirror of https://github.com/linkerd/linkerd2.git
Proxy: Add `transport::prefixed::Prefixed`. (#1196)
Copy most of the implementation of `connection::Connection` to create a way to prefix a `TcpStream` with some previously-read bytes. This will allow us to read and parse a TLS ClientHello message to see if it is intended for the proxy to process, and then "rewind" and feed it back into the TLS implementation if so. This must be in the `transport` submodule in order for it to implement the private `Io` trait. Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
140f246624
commit
75034ef09d
|
@ -252,6 +252,9 @@ impl Connection {
|
|||
|
||||
impl io::Read for Connection {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
// TODO: Eliminate the duplication between this and
|
||||
// `transport::prefixed::Prefixed`.
|
||||
|
||||
// Check the length only once, since looking as the length
|
||||
// of a BytesMut isn't as cheap as the length of a &[u8].
|
||||
let peeked_len = self.peek_buf.len();
|
||||
|
@ -266,7 +269,7 @@ impl io::Read for Connection {
|
|||
// hold onto the allocated memory any longer. We won't peek
|
||||
// again.
|
||||
if peeked_len == len {
|
||||
self.peek_buf = BytesMut::new();
|
||||
self.peek_buf = Default::default();
|
||||
}
|
||||
Ok(len)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
mod connect;
|
||||
mod addr_info;
|
||||
mod io;
|
||||
mod prefixed;
|
||||
pub mod tls;
|
||||
|
||||
pub use self::connect::{
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
#![allow(dead_code)] // TODO: Actually use this.
|
||||
|
||||
use std::{cmp, fmt::Debug, io, net::SocketAddr};
|
||||
|
||||
use super::io::internal::Io;
|
||||
use bytes::{Buf, Bytes};
|
||||
use tokio::prelude::*;
|
||||
use AddrInfo;
|
||||
|
||||
/// A TcpStream where the initial reads will be served from `prefix`.
|
||||
#[derive(Debug)]
|
||||
pub struct Prefixed<S> {
|
||||
prefix: Bytes,
|
||||
io: S,
|
||||
}
|
||||
|
||||
impl<S> Prefixed<S> {
|
||||
pub fn new(prefix: Bytes, io: S) -> Self {
|
||||
Self { prefix, io }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> io::Read for Prefixed<S> where S: Debug + io::Read {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
|
||||
// Check the length only once, since looking as the length
|
||||
// of a Bytes isn't as cheap as the length of a &[u8].
|
||||
let peeked_len = self.prefix.len();
|
||||
|
||||
if peeked_len == 0 {
|
||||
self.io.read(buf)
|
||||
} else {
|
||||
let len = cmp::min(buf.len(), peeked_len);
|
||||
buf[..len].copy_from_slice(&self.prefix.as_ref()[..len]);
|
||||
self.prefix.advance(len);
|
||||
// If we've finally emptied the peek_buf, drop it so we don't
|
||||
// hold onto the allocated memory any longer. We won't peek
|
||||
// again.
|
||||
if peeked_len == len {
|
||||
self.prefix = Default::default();
|
||||
}
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AsyncRead for Prefixed<S> where S: AsyncRead + Debug {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.io.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> io::Write for Prefixed<S> where S: Debug + io::Write {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.io.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.io.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AsyncWrite for Prefixed<S> where S: AsyncWrite + Debug {
|
||||
fn shutdown(&mut self) -> Result<Async<()>, io::Error> {
|
||||
self.io.shutdown()
|
||||
}
|
||||
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
|
||||
where Self: Sized
|
||||
{
|
||||
self.io.write_buf(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AddrInfo for Prefixed<S> where S: AddrInfo {
|
||||
fn local_addr(&self) -> Result<SocketAddr, io::Error> {
|
||||
self.io.local_addr()
|
||||
}
|
||||
|
||||
fn get_original_dst(&self) -> Option<SocketAddr> {
|
||||
self.io.get_original_dst()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Io for Prefixed<S> where S: Io {
|
||||
fn shutdown_write(&mut self) -> Result<(), io::Error> {
|
||||
self.io.shutdown_write()
|
||||
}
|
||||
|
||||
fn write_buf_erased(&mut self, buf: &mut Buf) -> Result<Async<usize>, io::Error> {
|
||||
self.io.write_buf_erased(buf)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue