mirror of https://github.com/linkerd/linkerd2.git
proxy: Remove the `Labeled` middleware in favor of client context labels (#812)
The `Labeled` middleware is used to add `DstLabels` to each request. Now that each client context maintains a watch on its endpoint's `DstLabels`, the `Labeled` middleware can safely be removed. This has one subtle behavior change: labels are associated with requests _lazily_, whereas before they were determined _eagerly_. This means that if an endpoints labels are updated before the telemetry system captures the labels for the request, it may use the newer labels. Previously, it would only use the labels at the time that the request originated.
This commit is contained in:
parent
6eec6256f7
commit
0e39d6d8fa
|
@ -29,7 +29,7 @@ use transport::DnsNameAndPort;
|
|||
|
||||
use control::cache::{Cache, CacheChange, Exists};
|
||||
|
||||
use ::telemetry::metrics::{DstLabels, Labeled};
|
||||
use ::telemetry::metrics::DstLabels;
|
||||
|
||||
/// A handle to start watching a destination for address changes.
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -274,7 +274,7 @@ where
|
|||
type Request = B::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Service = Labeled<B::Service>;
|
||||
type Service = B::Service;
|
||||
type DiscoverError = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
|
@ -295,7 +295,6 @@ where
|
|||
let endpoint = Endpoint::new(addr, labels_watch.clone());
|
||||
|
||||
let service = self.bind.bind(&endpoint)
|
||||
.map(|svc| Labeled::new(svc, labels_watch))
|
||||
.map_err(|_| ())?;
|
||||
|
||||
return Ok(Async::Ready(Change::Insert(addr, service)))
|
||||
|
|
|
@ -3,7 +3,6 @@ use std::sync::Arc;
|
|||
|
||||
use control::discovery::DstLabelsWatch;
|
||||
use ctx;
|
||||
use telemetry::metrics::DstLabels;
|
||||
|
||||
|
||||
/// Describes a stream's request headers.
|
||||
|
@ -20,11 +19,6 @@ pub struct Request {
|
|||
|
||||
/// Identifies the proxy client that dispatched the request.
|
||||
pub client: Arc<ctx::transport::Client>,
|
||||
|
||||
/// Optional information on the request's destination service, which may
|
||||
/// be provided by the control plane for destinations lookups against its
|
||||
/// discovery API.
|
||||
pub dst_labels: Option<DstLabels>,
|
||||
}
|
||||
|
||||
/// Describes a stream's response headers.
|
||||
|
@ -49,19 +43,12 @@ impl Request {
|
|||
client: &Arc<ctx::transport::Client>,
|
||||
id: usize,
|
||||
) -> Arc<Self> {
|
||||
// Look up whether the request has been extended with optional
|
||||
// destination labels from the control plane's discovery API.
|
||||
let dst_labels = request
|
||||
.extensions()
|
||||
.get::<DstLabels>()
|
||||
.cloned();
|
||||
let r = Self {
|
||||
id,
|
||||
uri: request.uri().clone(),
|
||||
method: request.method().clone(),
|
||||
server: Arc::clone(server),
|
||||
client: Arc::clone(client),
|
||||
dst_labels,
|
||||
};
|
||||
|
||||
Arc::new(r)
|
||||
|
|
|
@ -18,7 +18,6 @@ use bind::{self, Bind, Protocol};
|
|||
use control::{self, discovery};
|
||||
use control::discovery::Bind as BindTrait;
|
||||
use ctx;
|
||||
use telemetry::metrics;
|
||||
use timeout::Timeout;
|
||||
use transparency::h1;
|
||||
use transport::{DnsNameAndPort, Host, HostAndPort};
|
||||
|
@ -170,7 +169,7 @@ where
|
|||
type Request = http::Request<B>;
|
||||
type Response = bind::HttpResponse;
|
||||
type Error = <Self::Service as tower::Service>::Error;
|
||||
type Service = metrics::Labeled<bind::Service<B>>;
|
||||
type Service = bind::Service<B>;
|
||||
type DiscoverError = BindError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
|
@ -185,9 +184,6 @@ where
|
|||
// closing down when the connection is no longer usable.
|
||||
if let Some((addr, bind)) = opt.take() {
|
||||
let svc = bind.bind(&addr.into())
|
||||
// The controller has no labels to add to an external
|
||||
// service.
|
||||
.map(metrics::Labeled::none)
|
||||
.map_err(|_| BindError::External{ addr })?;
|
||||
Ok(Async::Ready(Change::Insert(addr, svc)))
|
||||
} else {
|
||||
|
|
|
@ -1,21 +1,9 @@
|
|||
|
||||
use futures::Poll;
|
||||
use futures_watch::Watch;
|
||||
use http;
|
||||
use tower::Service;
|
||||
|
||||
use std::fmt::{self, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use ctx;
|
||||
use http;
|
||||
|
||||
/// Middleware that adds an extension containing an optional set of metric
|
||||
/// labels to requests.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Labeled<T> {
|
||||
metric_labels: Option<Watch<Option<DstLabels>>>,
|
||||
inner: T,
|
||||
}
|
||||
use ctx;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct RequestLabels {
|
||||
|
@ -63,56 +51,14 @@ enum Direction {
|
|||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct DstLabels(Arc<str>);
|
||||
|
||||
// ===== impl Labeled =====
|
||||
|
||||
impl<T> Labeled<T> {
|
||||
|
||||
/// Wrap `inner` with a `Watch` on dyanmically updated labels.
|
||||
pub fn new(inner: T, watch: Watch<Option<DstLabels>>) -> Self {
|
||||
Self {
|
||||
metric_labels: Some(watch),
|
||||
inner,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap `inner` with no `metric_labels`.
|
||||
pub fn none(inner: T) -> Self {
|
||||
Self { metric_labels: None, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, A> Service for Labeled<T>
|
||||
where
|
||||
T: Service<Request=http::Request<A>>,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response= T::Response;
|
||||
type Error = T::Error;
|
||||
type Future = T::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
let mut req = req;
|
||||
if let Some(labels) = self.metric_labels.as_ref()
|
||||
.and_then(|labels| (*labels.borrow()).as_ref().cloned())
|
||||
{
|
||||
req.extensions_mut().insert(labels);
|
||||
}
|
||||
self.inner.call(req)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ===== impl RequestLabels =====
|
||||
|
||||
impl<'a> RequestLabels {
|
||||
pub fn new(req: &ctx::http::Request) -> Self {
|
||||
let direction = Direction::from_context(req.server.proxy.as_ref());
|
||||
|
||||
let outbound_labels = req.dst_labels.as_ref().cloned();
|
||||
let outbound_labels = req.dst_labels()
|
||||
.and_then(|b| b.borrow().clone());
|
||||
|
||||
let authority = req.uri
|
||||
.authority_part()
|
||||
|
@ -282,87 +228,3 @@ impl fmt::Display for DstLabels {
|
|||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
use futures::{Async, Poll, Future};
|
||||
use futures::future::{self, FutureResult};
|
||||
use http;
|
||||
use tower::Service;
|
||||
|
||||
|
||||
struct MockInnerService;
|
||||
|
||||
impl Service for MockInnerService {
|
||||
type Request = http::Request<()>;
|
||||
type Response = Option<String>;
|
||||
type Error = ();
|
||||
type Future = FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
future::ok(req.extensions()
|
||||
.get::<DstLabels>()
|
||||
.map(|&DstLabels(ref inner)| inner.as_ref().to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_labels() {
|
||||
let mut labeled = Labeled {
|
||||
metric_labels: None,
|
||||
inner: MockInnerService,
|
||||
};
|
||||
let labels = labeled.call(http::Request::new(()))
|
||||
.wait().expect("call");
|
||||
assert_eq!(None, labels);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_label() {
|
||||
let (watch, _) =
|
||||
Watch::new(DstLabels::new(vec![("foo", "bar")]));
|
||||
let mut labeled = Labeled {
|
||||
metric_labels: Some(watch),
|
||||
inner: MockInnerService,
|
||||
};
|
||||
let labels = labeled.call(http::Request::new(()))
|
||||
.wait().expect("call");
|
||||
assert_eq!(Some("dst_foo=\"bar\"".to_string()), labels);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn label_updates() {
|
||||
let (watch, mut store) =
|
||||
Watch::new(DstLabels::new(vec![("foo", "bar")]));
|
||||
let mut labeled = Labeled {
|
||||
metric_labels: Some(watch),
|
||||
inner: MockInnerService,
|
||||
};
|
||||
|
||||
let labels = labeled.call(http::Request::new(()))
|
||||
.wait().expect("first call");
|
||||
assert_eq!(Some("dst_foo=\"bar\"".to_string()), labels);
|
||||
|
||||
store.store(DstLabels::new(vec![("foo", "baz")]))
|
||||
.expect("store (\"foo\", \"baz\")");
|
||||
let labels = labeled.call(http::Request::new(()))
|
||||
.wait().expect("second call");
|
||||
assert_eq!(Some("dst_foo=\"baz\"".to_string()), labels);
|
||||
|
||||
store.store(DstLabels::new(vec![
|
||||
("foo", "baz"),
|
||||
("quux", "quuux")
|
||||
]))
|
||||
.expect("store (\"foo\", \"baz\"), (\"quux\", \"quuux\")");
|
||||
let labels = labeled.call(http::Request::new(()))
|
||||
.wait().expect("third call");
|
||||
assert_eq!(Some("dst_foo=\"baz\",dst_quux=\"quuux\"".to_string()), labels);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ mod latency;
|
|||
|
||||
use self::labels::{RequestLabels, ResponseLabels};
|
||||
use self::latency::{BUCKET_BOUNDS, Histogram};
|
||||
pub use self::labels::{DstLabels, Labeled};
|
||||
pub use self::labels::DstLabels;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Metrics {
|
||||
|
|
Loading…
Reference in New Issue