mirror of https://github.com/linkerd/linkerd2.git
Fix infinite loop in `tcp::HalfDuplex::copy_into()` (#537)
An infinite loop exists in the TCP proxy, which could be triggered by any raw TCP connection (including HTTPS requests). The connection will be proxied successfully, but instead of closing, it will remain open, and the proxy's CPU usage will remain extremely high indefinitely. Since `Duplex::poll` will call `half_in.copy_into()`/`half_out.copy_into()` repeatedly, even after they return `Async::Ready`, when one half has shut down and returned ready, it may still be polled again, as `Duplex::poll` waits until _both_ halves have returned `Ready`. Because of the guard that `!dst.is_shutdown`, intended to prevent the destination from shutting down twice, the function will not return if it is polled again after returning `Async::Ready` once. I've fixed this by moving the guard against double shutdowns out of the loop, so that the function will return `Async::Ready` again if it is polled after shutting down the destination. I've also included a unit test against regressions to this bug. The unit test fails against master. Fixes #519 Signed-off-by: Eliza Weisman <eliza@buoyant.io> Co-Authored-By: Andrew Seigner <andrew@sig.gy>
This commit is contained in:
parent
3a73411375
commit
6af9871f13
|
@ -132,7 +132,6 @@ where
|
|||
// could make progress.
|
||||
self.half_in.copy_into(&mut self.half_out)?;
|
||||
self.half_out.copy_into(&mut self.half_in)?;
|
||||
|
||||
if self.half_in.is_done() && self.half_out.is_done() {
|
||||
Ok(Async::Ready(()))
|
||||
} else {
|
||||
|
@ -157,17 +156,28 @@ where
|
|||
where
|
||||
U: AsyncWrite,
|
||||
{
|
||||
// Since Duplex::poll() intentionally ignores the Async part of our
|
||||
// return value, we may be polled again after returning Ready, if the
|
||||
// other half isn't ready. In that case, if the destination has
|
||||
// shutdown, we finished in a previous poll, so don't even enter into
|
||||
// the copy loop.
|
||||
if dst.is_shutdown {
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
loop {
|
||||
try_ready!(self.read());
|
||||
try_ready!(self.write_into(dst));
|
||||
|
||||
if self.buf.is_none() && !dst.is_shutdown {
|
||||
if self.buf.is_none() {
|
||||
debug_assert!(!dst.is_shutdown,
|
||||
"attempted to shut down destination twice");
|
||||
try_ready!(dst.io.shutdown());
|
||||
dst.is_shutdown = true;
|
||||
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Poll<(), io::Error> {
|
||||
|
@ -257,3 +267,62 @@ impl BufMut for CopyBuf {
|
|||
self.write_pos += cnt;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::{Error, Read, Write, Result};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use futures::{Async, Poll};
|
||||
use super::*;
|
||||
|
||||
struct DoneIo(AtomicBool);
|
||||
|
||||
impl<'a> Read for &'a DoneIo {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
if self.0.swap(false, Ordering::Relaxed) {
|
||||
Ok(buf.len())
|
||||
} else {
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> AsyncRead for &'a DoneIo {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Write for &'a DoneIo {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize> {
|
||||
Ok(buf.len())
|
||||
}
|
||||
fn flush(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
impl<'a> AsyncWrite for &'a DoneIo {
|
||||
fn shutdown(&mut self) -> Poll<(), Error> {
|
||||
if self.0.swap(false, Ordering::Relaxed) {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn duplex_doesnt_hang_when_one_half_finishes() {
|
||||
// Test reproducing an infinite loop in Duplex that caused issue #519,
|
||||
// where a Duplex would enter an infinite loop when one half finishes.
|
||||
let io_1 = DoneIo(AtomicBool::new(true));
|
||||
let io_2 = DoneIo(AtomicBool::new(true));
|
||||
let mut duplex = Duplex::new(&io_1, &io_2);
|
||||
|
||||
assert_eq!(duplex.poll().unwrap(), Async::NotReady);
|
||||
assert_eq!(duplex.poll().unwrap(), Async::Ready(()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue