proxy: Remove dynamic label updating on bound services (#1006)
Depends on tower-rs/tower#75. Required for #386 In order for the proxy to use the TLS support metadata from the Destination service correctly, we determined that the code for dynamically changing the labels on an already-bound service should be removed, and any change in metadata should cause an endpoint to be rebound. I've modified the proxy so that we no longer update the labels using `futures-watch` (as a sidenote, we no longer depend on that crate). Metadata update events now cause the `tower-discover::Discover` implementation for `DestinationSet` to re-insert the changed endpoint into the load balancer. Upstream PR tower-rs/tower#75 in tower-balance changes the load balancer to honor duplicate insertions by replacing the old endpoint rather than ignoring them; that change is necessary for the tests to pass on this branch. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
4ede9a7ef3
commit
ec72012982
|
@ -128,7 +128,6 @@ dependencies = [
|
||||||
"flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"futures-mpsc-lossy 0.3.0",
|
"futures-mpsc-lossy 0.3.0",
|
||||||
"futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)",
|
|
||||||
"h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
"h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -349,15 +348,6 @@ dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-watch"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "git+https://github.com/carllerche/better-future.git#07baa13e91fefe7a51533dfde7b4e69e109ebe14"
|
|
||||||
dependencies = [
|
|
||||||
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "gzip-header"
|
name = "gzip-header"
|
||||||
version = "0.1.2"
|
version = "0.1.2"
|
||||||
|
@ -1074,7 +1064,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-balance"
|
name = "tower-balance"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -1087,7 +1077,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-buffer"
|
name = "tower-buffer"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||||
|
@ -1096,7 +1086,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-discover"
|
name = "tower-discover"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||||
|
@ -1146,7 +1136,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-in-flight-limit"
|
name = "tower-in-flight-limit"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||||
|
@ -1155,7 +1145,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-reconnect"
|
name = "tower-reconnect"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -1165,7 +1155,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-service"
|
name = "tower-service"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
@ -1173,7 +1163,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-util"
|
name = "tower-util"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
|
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||||
|
@ -1393,7 +1383,6 @@ dependencies = [
|
||||||
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
|
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
|
||||||
"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c"
|
"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c"
|
||||||
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
|
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
|
||||||
"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)" = "<none>"
|
|
||||||
"checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40"
|
"checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40"
|
||||||
"checksum h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "01f3fc6ec9a3437d52fdb95000b8aeec314842b47529631e3b5570597c6ffad7"
|
"checksum h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "01f3fc6ec9a3437d52fdb95000b8aeec314842b47529631e3b5570597c6ffad7"
|
||||||
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"
|
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"
|
||||||
|
|
|
@ -19,7 +19,6 @@ bytes = "0.4"
|
||||||
deflate = {version = "0.7.18", features = ["gzip"] }
|
deflate = {version = "0.7.18", features = ["gzip"] }
|
||||||
env_logger = { version = "0.5", default-features = false }
|
env_logger = { version = "0.5", default-features = false }
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
futures-watch = { git = "https://github.com/carllerche/better-future.git" }
|
|
||||||
h2 = "0.1.7"
|
h2 = "0.1.7"
|
||||||
http = "0.1"
|
http = "0.1"
|
||||||
httparse = "1.2"
|
httparse = "1.2"
|
||||||
|
|
|
@ -183,7 +183,7 @@ where
|
||||||
// them onto the new watch first
|
// them onto the new watch first
|
||||||
match set.addrs {
|
match set.addrs {
|
||||||
Exists::Yes(ref cache) => for (&addr, meta) in cache {
|
Exists::Yes(ref cache) => for (&addr, meta) in cache {
|
||||||
let update = Update::Insert(addr, meta.clone());
|
let update = Update::Bind(addr, meta.clone());
|
||||||
resolve.responder.update_tx
|
resolve.responder.update_tx
|
||||||
.unbounded_send(update)
|
.unbounded_send(update)
|
||||||
.expect("unbounded_send does not fail");
|
.expect("unbounded_send does not fail");
|
||||||
|
@ -526,12 +526,12 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
||||||
) {
|
) {
|
||||||
let (update_str, update, addr) = match change {
|
let (update_str, update, addr) = match change {
|
||||||
CacheChange::Insertion { key, value } => {
|
CacheChange::Insertion { key, value } => {
|
||||||
("insert", Update::Insert(key, value.clone()), key)
|
("insert", Update::Bind(key, value.clone()), key)
|
||||||
},
|
},
|
||||||
CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
|
CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
|
||||||
CacheChange::Modification { key, new_value } => (
|
CacheChange::Modification { key, new_value } => (
|
||||||
"change metadata for",
|
"change metadata for",
|
||||||
Update::ChangeMetadata(key, new_value.clone()),
|
Update::Bind(key, new_value.clone()),
|
||||||
key,
|
key,
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,26 +1,24 @@
|
||||||
use futures_watch;
|
use std::net::SocketAddr;
|
||||||
use std::{cmp, hash, net::SocketAddr};
|
|
||||||
|
|
||||||
use telemetry::metrics::DstLabels;
|
use telemetry::metrics::DstLabels;
|
||||||
|
|
||||||
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
|
|
||||||
|
|
||||||
/// An individual traffic target.
|
/// An individual traffic target.
|
||||||
///
|
///
|
||||||
/// Equality, Ordering, and hashability is determined soley by the Endpoint's address.
|
/// Equality, Ordering, and hashability is determined soley by the Endpoint's address.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||||
pub struct Endpoint {
|
pub struct Endpoint {
|
||||||
address: SocketAddr,
|
address: SocketAddr,
|
||||||
dst_labels: Option<DstLabelsWatch>,
|
dst_labels: Option<DstLabels>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==== impl Endpoint =====
|
// ==== impl Endpoint =====
|
||||||
|
|
||||||
impl Endpoint {
|
impl Endpoint {
|
||||||
pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self {
|
pub fn new(address: SocketAddr, dst_labels: Option<DstLabels>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
address,
|
address,
|
||||||
dst_labels: Some(dst_labels),
|
dst_labels,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +26,7 @@ impl Endpoint {
|
||||||
self.address
|
self.address
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
|
pub fn dst_labels(&self) -> Option<&DstLabels> {
|
||||||
self.dst_labels.as_ref()
|
self.dst_labels.as_ref()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,17 +39,3 @@ impl From<SocketAddr> for Endpoint {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl hash::Hash for Endpoint {
|
|
||||||
fn hash<H: hash::Hasher>(&self, state: &mut H) {
|
|
||||||
self.address.hash(state)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl cmp::PartialEq for Endpoint {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.address.eq(&other.address)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl cmp::Eq for Endpoint {}
|
|
||||||
|
|
|
@ -24,7 +24,6 @@
|
||||||
//! - We need some means to limit the number of endpoints that can be returned for a
|
//! - We need some means to limit the number of endpoints that can be returned for a
|
||||||
//! single resolution so that `control::Cache` is not effectively unbounded.
|
//! single resolution so that `control::Cache` is not effectively unbounded.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
|
@ -35,7 +34,6 @@ use futures::{
|
||||||
Poll,
|
Poll,
|
||||||
Stream
|
Stream
|
||||||
};
|
};
|
||||||
use futures_watch::{Store, Watch};
|
|
||||||
use http;
|
use http;
|
||||||
use tower_discover::{Change, Discover};
|
use tower_discover::{Change, Discover};
|
||||||
use tower_service::Service;
|
use tower_service::Service;
|
||||||
|
@ -47,7 +45,7 @@ use transport::{DnsNameAndPort, HostAndPort};
|
||||||
pub mod background;
|
pub mod background;
|
||||||
mod endpoint;
|
mod endpoint;
|
||||||
|
|
||||||
pub use self::endpoint::{DstLabelsWatch, Endpoint};
|
pub use self::endpoint::Endpoint;
|
||||||
|
|
||||||
/// A handle to request resolutions from the background discovery task.
|
/// A handle to request resolutions from the background discovery task.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -84,18 +82,11 @@ pub struct Resolution<B> {
|
||||||
/// reference has been dropped.
|
/// reference has been dropped.
|
||||||
_active: Arc<()>,
|
_active: Arc<()>,
|
||||||
|
|
||||||
/// Map associating addresses with the `Store` for the watch on that
|
|
||||||
/// service's metric labels (as provided by the Destination service).
|
|
||||||
///
|
|
||||||
/// This is used to update the `Labeled` middleware on those services
|
|
||||||
/// without requiring the service stack to be re-bound.
|
|
||||||
metric_labels: HashMap<SocketAddr, Store<Option<DstLabels>>>,
|
|
||||||
|
|
||||||
/// Binds an update endpoint to a Service.
|
/// Binds an update endpoint to a Service.
|
||||||
bind: B,
|
bind: B,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// .
|
/// Metadata describing an endpoint.
|
||||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||||
struct Metadata {
|
struct Metadata {
|
||||||
/// A set of Prometheus metric labels describing the destination.
|
/// A set of Prometheus metric labels describing the destination.
|
||||||
|
@ -104,9 +95,14 @@ struct Metadata {
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
enum Update {
|
enum Update {
|
||||||
Insert(SocketAddr, Metadata),
|
/// Indicates that an endpoint should be bound to `SocketAddr` with the
|
||||||
|
/// provided `Metadata`.
|
||||||
|
///
|
||||||
|
/// If there was already an endpoint in the load balancer for this
|
||||||
|
/// address, it should be replaced with the new one.
|
||||||
|
Bind(SocketAddr, Metadata),
|
||||||
|
/// Indicates that the endpoint for this `SocketAddr` should be removed.
|
||||||
Remove(SocketAddr),
|
Remove(SocketAddr),
|
||||||
ChangeMetadata(SocketAddr, Metadata),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bind a `SocketAddr` with a protocol.
|
/// Bind a `SocketAddr` with a protocol.
|
||||||
|
@ -178,7 +174,6 @@ impl Resolver {
|
||||||
Resolution {
|
Resolution {
|
||||||
update_rx,
|
update_rx,
|
||||||
_active: active,
|
_active: active,
|
||||||
metric_labels: HashMap::new(),
|
|
||||||
bind,
|
bind,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,29 +181,6 @@ impl Resolver {
|
||||||
|
|
||||||
// ==== impl Resolution =====
|
// ==== impl Resolution =====
|
||||||
|
|
||||||
impl<B> Resolution<B> {
|
|
||||||
fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> {
|
|
||||||
if let Some(store) = self.metric_labels.get_mut(&addr) {
|
|
||||||
store
|
|
||||||
.store(meta.metric_labels)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("update_metadata: label store error: {:?}", e);
|
|
||||||
})
|
|
||||||
.map(|_| ())
|
|
||||||
} else {
|
|
||||||
// The store has already been removed, so nobody cares about
|
|
||||||
// the metadata change. We expect that this shouldn't happen,
|
|
||||||
// but if it does, log a warning and handle it gracefully.
|
|
||||||
warn!(
|
|
||||||
"update_metadata: ignoring ChangeMetadata for {:?} because the service no longer \
|
|
||||||
exists.",
|
|
||||||
addr
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B, A> Discover for Resolution<B>
|
impl<B, A> Discover for Resolution<B>
|
||||||
where
|
where
|
||||||
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
|
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
|
||||||
|
@ -227,29 +199,18 @@ where
|
||||||
let update = try_ready!(up).expect("destination stream must be infinite");
|
let update = try_ready!(up).expect("destination stream must be infinite");
|
||||||
|
|
||||||
match update {
|
match update {
|
||||||
Update::Insert(addr, meta) => {
|
Update::Bind(addr, meta) => {
|
||||||
// Construct a watch for the `Labeled` middleware that will
|
// We expect the load balancer to handle duplicate inserts
|
||||||
// wrap the bound service, and insert the store into our map
|
// by replacing the old endpoint with the new one, so
|
||||||
// so it can be updated later.
|
// insertions of new endpoints and metadata changes for
|
||||||
let (labels_watch, labels_store) = Watch::new(meta.metric_labels);
|
// existing ones can be handled in the same way.
|
||||||
self.metric_labels.insert(addr, labels_store);
|
let endpoint = Endpoint::new(addr, meta.metric_labels.clone());
|
||||||
|
|
||||||
let endpoint = Endpoint::new(addr, labels_watch.clone());
|
|
||||||
|
|
||||||
let service = self.bind.bind(&endpoint).map_err(|_| ())?;
|
let service = self.bind.bind(&endpoint).map_err(|_| ())?;
|
||||||
|
|
||||||
return Ok(Async::Ready(Change::Insert(addr, service)));
|
return Ok(Async::Ready(Change::Insert(addr, service)));
|
||||||
},
|
},
|
||||||
Update::ChangeMetadata(addr, meta) => {
|
|
||||||
// Update metadata and continue polling `rx`.
|
|
||||||
self.update_metadata(addr, meta)?;
|
|
||||||
},
|
|
||||||
Update::Remove(addr) => {
|
Update::Remove(addr) => {
|
||||||
// It's safe to drop the store handle here, even if
|
|
||||||
// the `Labeled` middleware using the watch handle
|
|
||||||
// still exists --- it will simply read the final
|
|
||||||
// value from the watch.
|
|
||||||
self.metric_labels.remove(&addr);
|
|
||||||
return Ok(Async::Ready(Change::Remove(addr)));
|
return Ok(Async::Ready(Change::Remove(addr)));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,7 @@ impl event::StreamResponseEnd {
|
||||||
};
|
};
|
||||||
|
|
||||||
let destination_meta = ctx.dst_labels()
|
let destination_meta = ctx.dst_labels()
|
||||||
.and_then(|b| b.borrow().clone())
|
.map(|ref d| tap_event::EndpointMeta {
|
||||||
.map(|d| tap_event::EndpointMeta {
|
|
||||||
labels: d.as_map().clone(),
|
labels: d.as_map().clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -78,8 +77,7 @@ impl event::StreamResponseFail {
|
||||||
};
|
};
|
||||||
|
|
||||||
let destination_meta = ctx.dst_labels()
|
let destination_meta = ctx.dst_labels()
|
||||||
.and_then(|b| b.borrow().clone())
|
.map(|ref d| tap_event::EndpointMeta {
|
||||||
.map(|d| tap_event::EndpointMeta {
|
|
||||||
labels: d.as_map().clone(),
|
labels: d.as_map().clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -110,8 +108,7 @@ impl event::StreamRequestFail {
|
||||||
};
|
};
|
||||||
|
|
||||||
let destination_meta = ctx.dst_labels()
|
let destination_meta = ctx.dst_labels()
|
||||||
.and_then(|b| b.borrow().clone())
|
.map(|ref d| tap_event::EndpointMeta {
|
||||||
.map(|d| tap_event::EndpointMeta {
|
|
||||||
labels: d.as_map().clone(),
|
labels: d.as_map().clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -150,8 +147,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
|
||||||
};
|
};
|
||||||
|
|
||||||
let destination_meta = ctx.dst_labels()
|
let destination_meta = ctx.dst_labels()
|
||||||
.and_then(|b| b.borrow().clone())
|
.map(|ref d| tap_event::EndpointMeta {
|
||||||
.map(|d| tap_event::EndpointMeta {
|
|
||||||
labels: d.as_map().clone(),
|
labels: d.as_map().clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -176,9 +172,8 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
|
||||||
http_status: u32::from(ctx.status.as_u16()),
|
http_status: u32::from(ctx.status.as_u16()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let destination_meta = ctx.request.dst_labels()
|
let destination_meta = ctx.dst_labels()
|
||||||
.and_then(|b| b.borrow().clone())
|
.map(|ref d| tap_event::EndpointMeta {
|
||||||
.map(|d| tap_event::EndpointMeta {
|
|
||||||
labels: d.as_map().clone(),
|
labels: d.as_map().clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
use http;
|
use http;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use control::destination::DstLabelsWatch;
|
|
||||||
use ctx;
|
use ctx;
|
||||||
|
use telemetry::metrics::DstLabels;
|
||||||
|
|
||||||
/// Describes a stream's request headers.
|
/// Describes a stream's request headers.
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
|
@ -54,7 +53,7 @@ impl Request {
|
||||||
Arc::new(r)
|
Arc::new(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
|
pub fn dst_labels(&self) -> Option<&DstLabels> {
|
||||||
self.client.dst_labels.as_ref()
|
self.client.dst_labels.as_ref()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,4 +67,8 @@ impl Response {
|
||||||
|
|
||||||
Arc::new(r)
|
Arc::new(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn dst_labels(&self) -> Option<&DstLabels> {
|
||||||
|
self.request.dst_labels()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,6 @@ impl Proxy {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test_util {
|
pub mod test_util {
|
||||||
use http;
|
use http;
|
||||||
use futures_watch;
|
|
||||||
use std::{
|
use std::{
|
||||||
fmt,
|
fmt,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
|
@ -109,8 +108,8 @@ pub mod test_util {
|
||||||
L: IntoIterator<Item=(S, S)>,
|
L: IntoIterator<Item=(S, S)>,
|
||||||
S: fmt::Display,
|
S: fmt::Display,
|
||||||
{
|
{
|
||||||
let (labels_watch, _store) = futures_watch::Watch::new(DstLabels::new(labels));
|
let labels = DstLabels::new(labels);
|
||||||
ctx::transport::Client::new(&proxy, &addr(), Some(labels_watch))
|
ctx::transport::Client::new(&proxy, &addr(), labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn request(
|
pub fn request(
|
||||||
|
|
|
@ -2,8 +2,8 @@ use std::{cmp, hash};
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use control::destination::DstLabelsWatch;
|
|
||||||
use ctx;
|
use ctx;
|
||||||
|
use telemetry::metrics::DstLabels;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
pub enum Ctx {
|
pub enum Ctx {
|
||||||
|
@ -25,7 +25,7 @@ pub struct Server {
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
pub proxy: Arc<ctx::Proxy>,
|
pub proxy: Arc<ctx::Proxy>,
|
||||||
pub remote: SocketAddr,
|
pub remote: SocketAddr,
|
||||||
pub dst_labels: Option<DstLabelsWatch>,
|
pub dst_labels: Option<DstLabels>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ctx {
|
impl Ctx {
|
||||||
|
@ -82,7 +82,7 @@ impl Client {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
proxy: &Arc<ctx::Proxy>,
|
proxy: &Arc<ctx::Proxy>,
|
||||||
remote: &SocketAddr,
|
remote: &SocketAddr,
|
||||||
dst_labels: Option<DstLabelsWatch>,
|
dst_labels: Option<DstLabels>,
|
||||||
) -> Arc<Client> {
|
) -> Arc<Client> {
|
||||||
let c = Client {
|
let c = Client {
|
||||||
proxy: Arc::clone(proxy),
|
proxy: Arc::clone(proxy),
|
||||||
|
|
|
@ -10,7 +10,6 @@ extern crate deflate;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate futures_mpsc_lossy;
|
extern crate futures_mpsc_lossy;
|
||||||
extern crate futures_watch;
|
|
||||||
extern crate h2;
|
extern crate h2;
|
||||||
extern crate http;
|
extern crate http;
|
||||||
extern crate httparse;
|
extern crate httparse;
|
||||||
|
|
|
@ -85,8 +85,7 @@ impl RequestLabels {
|
||||||
pub fn new(req: &ctx::http::Request) -> Self {
|
pub fn new(req: &ctx::http::Request) -> Self {
|
||||||
let direction = Direction::from_context(req.server.proxy.as_ref());
|
let direction = Direction::from_context(req.server.proxy.as_ref());
|
||||||
|
|
||||||
let outbound_labels = req.dst_labels()
|
let outbound_labels = req.dst_labels().cloned();
|
||||||
.and_then(|b| b.borrow().clone());
|
|
||||||
|
|
||||||
let authority = req.uri
|
let authority = req.uri
|
||||||
.authority_part()
|
.authority_part()
|
||||||
|
|
|
@ -109,12 +109,7 @@ impl Match {
|
||||||
Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => {
|
Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => {
|
||||||
match req.dst_labels() {
|
match req.dst_labels() {
|
||||||
None => false,
|
None => false,
|
||||||
Some(ref b) => {
|
Some(labels) => label.matches(labels.as_map()),
|
||||||
match b.borrow().as_ref() {
|
|
||||||
None => false,
|
|
||||||
Some(ref labels) => label.matches(labels.as_map()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,12 +118,7 @@ impl Match {
|
||||||
Event::StreamResponseEnd(ref rsp, _) => {
|
Event::StreamResponseEnd(ref rsp, _) => {
|
||||||
match rsp.request.dst_labels() {
|
match rsp.request.dst_labels() {
|
||||||
None => false,
|
None => false,
|
||||||
Some(ref b) => {
|
Some(labels) => label.matches(labels.as_map()),
|
||||||
match b.borrow().as_ref() {
|
|
||||||
None => false,
|
|
||||||
Some(ref labels) => label.matches(labels.as_map()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue