balance: Log and fail stuck discovery streams. (#2484)

In 6d2abbc, we changed how outbound proxies process discovery updates.
The prior implementation used a watchdog timeout to bound the amount of
time an update stream could be full. With that change, when an update
channel fills, the backpressure can extend to the destination
controller's gRPC response stream.

To detect and avoid this harmful (and useless) backpressure, this change
modifies the balancer's discovery processing stream to exit when the
balancer has 1000 unprocessed discovery updates. A sufficiently scary
warning is logged.
This commit is contained in:
Oliver Gould 2023-10-17 11:01:19 -07:00 committed by GitHub
parent 54979bc5d5
commit 328826caa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 51 additions and 18 deletions

View File

@ -2,7 +2,7 @@ use futures_util::future::poll_fn;
use linkerd_error::Error;
use tokio::sync::mpsc;
use tower::discover;
use tracing::{debug, instrument::Instrument, trace};
use tracing::{debug, debug_span, instrument::Instrument, trace, warn};
pub type Result<K, S> = std::result::Result<discover::Change<K, S>, Error>;
pub type Buffer<K, S> = tokio_stream::wrappers::ReceiverStream<Result<K, S>>;
@ -16,6 +16,29 @@ where
{
let (tx, rx) = mpsc::channel(capacity);
// Attempts to send an update to the balancer, returning `true` if sending
// was successful and `false` otherwise.
let send = |tx: &mpsc::Sender<_>, up| {
match tx.try_send(up) {
Ok(()) => true,
// The balancer has been dropped (and will never be used again).
Err(mpsc::error::TrySendError::Closed(_)) => {
debug!("Discovery receiver dropped");
false
}
// The balancer is stalled and we can't continue to buffer
// updates for it.
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(
"The balancer is not processing discovery updates; aborting discovery stream"
);
false
}
}
};
debug!(%capacity, "Spawning discovery buffer");
tokio::spawn(
async move {
@ -23,41 +46,51 @@ where
loop {
let res = tokio::select! {
_ = tx.closed() => break,
biased;
_ = tx.closed() => {
debug!("Discovery receiver dropped");
return;
}
res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res,
};
let change = match res {
match res {
Some(Ok(change)) => {
trace!("Changed");
change
if !send(&tx, Ok(change)) {
// XXX(ver) We don't actually have a way to "blow
// up" the balancer in this situation. My
// understanding is that this will cause the
// balancer to get cut off from further updates,
// should it ever become available again. That needs
// to be fixed.
//
// One option would be to drop the discovery stream
// and rebuild it if the balancer ever becomes
// unblocked.
//
// Ultimately we need to track down how we're
// getting into this blocked/idle state
return;
}
}
Some(Err(e)) => {
let error = e.into();
debug!(%error);
let _ = tx.send(Err(error)).await;
send(&tx, Err(error));
return;
}
None => {
debug!("Discovery stream closed");
return;
}
};
tokio::select! {
_ = tx.closed() => break,
res = tx.send(Ok(change)) => {
if res.is_err() {
break;
}
trace!("Change sent");
}
}
}
debug!("Discovery receiver dropped");
}
.in_current_span(),
.in_current_span()
.instrument(debug_span!("discover")),
);
Buffer::new(rx)