diff --git a/Cargo.lock b/Cargo.lock index 415b0b189..281c368c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,7 +128,6 @@ dependencies = [ "flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "futures-mpsc-lossy 0.3.0", - "futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)", "h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -349,15 +348,6 @@ dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "futures-watch" -version = "0.1.0" -source = "git+https://github.com/carllerche/better-future.git#07baa13e91fefe7a51533dfde7b4e69e109ebe14" -dependencies = [ - "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "gzip-header" version = "0.1.2" @@ -1074,7 +1064,7 @@ dependencies = [ [[package]] name = "tower-balance" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1087,7 +1077,7 @@ dependencies = [ [[package]] name = "tower-buffer" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1096,7 +1086,7 @@ dependencies = [ [[package]] name = "tower-discover" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1146,7 +1136,7 @@ dependencies = [ [[package]] name = "tower-in-flight-limit" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1155,7 +1145,7 @@ dependencies = [ [[package]] name = "tower-reconnect" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1165,7 +1155,7 @@ dependencies = [ [[package]] name = "tower-service" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1173,7 +1163,7 @@ dependencies = [ [[package]] name = "tower-util" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb" +source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1393,7 +1383,6 @@ dependencies = [ "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" -"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)" = "" "checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40" "checksum h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "01f3fc6ec9a3437d52fdb95000b8aeec314842b47529631e3b5570597c6ffad7" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index bceec8ecb..fa8cb7c41 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -19,7 +19,6 @@ bytes = "0.4" deflate = {version = "0.7.18", features = ["gzip"] } env_logger = { version = "0.5", default-features = false } futures = "0.1" -futures-watch = { git = "https://github.com/carllerche/better-future.git" } h2 = "0.1.7" http = "0.1" httparse = "1.2" diff --git a/proxy/src/control/destination/background.rs b/proxy/src/control/destination/background.rs index 6b44de6a6..635ac18fb 100644 --- a/proxy/src/control/destination/background.rs +++ b/proxy/src/control/destination/background.rs @@ -183,7 +183,7 @@ where // them onto the new watch first match set.addrs { Exists::Yes(ref cache) => for (&addr, meta) in cache { - let update = Update::Insert(addr, meta.clone()); + let update = Update::Bind(addr, meta.clone()); resolve.responder.update_tx .unbounded_send(update) .expect("unbounded_send does not fail"); @@ -526,12 +526,12 @@ impl> DestinationSet { ) { let (update_str, update, addr) = match change { CacheChange::Insertion { key, value } => { - ("insert", Update::Insert(key, value.clone()), key) + ("insert", Update::Bind(key, value.clone()), key) }, CacheChange::Removal { key } => ("remove", Update::Remove(key), key), CacheChange::Modification { key, new_value } => ( "change metadata for", - Update::ChangeMetadata(key, new_value.clone()), + Update::Bind(key, new_value.clone()), key, ), }; diff --git a/proxy/src/control/destination/endpoint.rs b/proxy/src/control/destination/endpoint.rs index 27cc5a3c6..2640bb439 100644 --- a/proxy/src/control/destination/endpoint.rs +++ b/proxy/src/control/destination/endpoint.rs @@ -1,26 +1,24 @@ -use futures_watch; -use std::{cmp, hash, net::SocketAddr}; +use std::net::SocketAddr; use telemetry::metrics::DstLabels; -pub type DstLabelsWatch = futures_watch::Watch>; /// An individual traffic target. /// /// Equality, Ordering, and hashability is determined soley by the Endpoint's address. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct Endpoint { address: SocketAddr, - dst_labels: Option, + dst_labels: Option, } // ==== impl Endpoint ===== impl Endpoint { - pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self { + pub fn new(address: SocketAddr, dst_labels: Option) -> Self { Self { address, - dst_labels: Some(dst_labels), + dst_labels, } } @@ -28,7 +26,7 @@ impl Endpoint { self.address } - pub fn dst_labels(&self) -> Option<&DstLabelsWatch> { + pub fn dst_labels(&self) -> Option<&DstLabels> { self.dst_labels.as_ref() } } @@ -41,17 +39,3 @@ impl From for Endpoint { } } } - -impl hash::Hash for Endpoint { - fn hash(&self, state: &mut H) { - self.address.hash(state) - } -} - -impl cmp::PartialEq for Endpoint { - fn eq(&self, other: &Self) -> bool { - self.address.eq(&other.address) - } -} - -impl cmp::Eq for Endpoint {} diff --git a/proxy/src/control/destination/mod.rs b/proxy/src/control/destination/mod.rs index 724faa9c1..560f5cd17 100644 --- a/proxy/src/control/destination/mod.rs +++ b/proxy/src/control/destination/mod.rs @@ -24,7 +24,6 @@ //! - We need some means to limit the number of endpoints that can be returned for a //! single resolution so that `control::Cache` is not effectively unbounded. -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Weak}; @@ -35,7 +34,6 @@ use futures::{ Poll, Stream }; -use futures_watch::{Store, Watch}; use http; use tower_discover::{Change, Discover}; use tower_service::Service; @@ -47,7 +45,7 @@ use transport::{DnsNameAndPort, HostAndPort}; pub mod background; mod endpoint; -pub use self::endpoint::{DstLabelsWatch, Endpoint}; +pub use self::endpoint::Endpoint; /// A handle to request resolutions from the background discovery task. #[derive(Clone, Debug)] @@ -84,18 +82,11 @@ pub struct Resolution { /// reference has been dropped. _active: Arc<()>, - /// Map associating addresses with the `Store` for the watch on that - /// service's metric labels (as provided by the Destination service). - /// - /// This is used to update the `Labeled` middleware on those services - /// without requiring the service stack to be re-bound. - metric_labels: HashMap>>, - /// Binds an update endpoint to a Service. bind: B, } -/// . +/// Metadata describing an endpoint. #[derive(Clone, Debug, Hash, Eq, PartialEq)] struct Metadata { /// A set of Prometheus metric labels describing the destination. @@ -104,9 +95,14 @@ struct Metadata { #[derive(Debug, Clone)] enum Update { - Insert(SocketAddr, Metadata), + /// Indicates that an endpoint should be bound to `SocketAddr` with the + /// provided `Metadata`. + /// + /// If there was already an endpoint in the load balancer for this + /// address, it should be replaced with the new one. + Bind(SocketAddr, Metadata), + /// Indicates that the endpoint for this `SocketAddr` should be removed. Remove(SocketAddr), - ChangeMetadata(SocketAddr, Metadata), } /// Bind a `SocketAddr` with a protocol. @@ -178,7 +174,6 @@ impl Resolver { Resolution { update_rx, _active: active, - metric_labels: HashMap::new(), bind, } } @@ -186,29 +181,6 @@ impl Resolver { // ==== impl Resolution ===== -impl Resolution { - fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> { - if let Some(store) = self.metric_labels.get_mut(&addr) { - store - .store(meta.metric_labels) - .map_err(|e| { - error!("update_metadata: label store error: {:?}", e); - }) - .map(|_| ()) - } else { - // The store has already been removed, so nobody cares about - // the metadata change. We expect that this shouldn't happen, - // but if it does, log a warning and handle it gracefully. - warn!( - "update_metadata: ignoring ChangeMetadata for {:?} because the service no longer \ - exists.", - addr - ); - Ok(()) - } - } -} - impl Discover for Resolution where B: Bind>, @@ -227,29 +199,18 @@ where let update = try_ready!(up).expect("destination stream must be infinite"); match update { - Update::Insert(addr, meta) => { - // Construct a watch for the `Labeled` middleware that will - // wrap the bound service, and insert the store into our map - // so it can be updated later. - let (labels_watch, labels_store) = Watch::new(meta.metric_labels); - self.metric_labels.insert(addr, labels_store); - - let endpoint = Endpoint::new(addr, labels_watch.clone()); + Update::Bind(addr, meta) => { + // We expect the load balancer to handle duplicate inserts + // by replacing the old endpoint with the new one, so + // insertions of new endpoints and metadata changes for + // existing ones can be handled in the same way. + let endpoint = Endpoint::new(addr, meta.metric_labels.clone()); let service = self.bind.bind(&endpoint).map_err(|_| ())?; return Ok(Async::Ready(Change::Insert(addr, service))); }, - Update::ChangeMetadata(addr, meta) => { - // Update metadata and continue polling `rx`. - self.update_metadata(addr, meta)?; - }, Update::Remove(addr) => { - // It's safe to drop the store handle here, even if - // the `Labeled` middleware using the watch handle - // still exists --- it will simply read the final - // value from the watch. - self.metric_labels.remove(&addr); return Ok(Async::Ready(Change::Remove(addr))); }, } diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index a9836fa84..bd8f891d0 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -46,8 +46,7 @@ impl event::StreamResponseEnd { }; let destination_meta = ctx.dst_labels() - .and_then(|b| b.borrow().clone()) - .map(|d| tap_event::EndpointMeta { + .map(|ref d| tap_event::EndpointMeta { labels: d.as_map().clone(), }); @@ -78,8 +77,7 @@ impl event::StreamResponseFail { }; let destination_meta = ctx.dst_labels() - .and_then(|b| b.borrow().clone()) - .map(|d| tap_event::EndpointMeta { + .map(|ref d| tap_event::EndpointMeta { labels: d.as_map().clone(), }); @@ -110,8 +108,7 @@ impl event::StreamRequestFail { }; let destination_meta = ctx.dst_labels() - .and_then(|b| b.borrow().clone()) - .map(|d| tap_event::EndpointMeta { + .map(|ref d| tap_event::EndpointMeta { labels: d.as_map().clone(), }); @@ -150,8 +147,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { }; let destination_meta = ctx.dst_labels() - .and_then(|b| b.borrow().clone()) - .map(|d| tap_event::EndpointMeta { + .map(|ref d| tap_event::EndpointMeta { labels: d.as_map().clone(), }); @@ -176,9 +172,8 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { http_status: u32::from(ctx.status.as_u16()), }; - let destination_meta = ctx.request.dst_labels() - .and_then(|b| b.borrow().clone()) - .map(|d| tap_event::EndpointMeta { + let destination_meta = ctx.dst_labels() + .map(|ref d| tap_event::EndpointMeta { labels: d.as_map().clone(), }); diff --git a/proxy/src/ctx/http.rs b/proxy/src/ctx/http.rs index 80ff43710..801077a75 100644 --- a/proxy/src/ctx/http.rs +++ b/proxy/src/ctx/http.rs @@ -1,9 +1,8 @@ use http; use std::sync::Arc; -use control::destination::DstLabelsWatch; use ctx; - +use telemetry::metrics::DstLabels; /// Describes a stream's request headers. #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -54,7 +53,7 @@ impl Request { Arc::new(r) } - pub fn dst_labels(&self) -> Option<&DstLabelsWatch> { + pub fn dst_labels(&self) -> Option<&DstLabels> { self.client.dst_labels.as_ref() } } @@ -68,4 +67,8 @@ impl Response { Arc::new(r) } + + pub fn dst_labels(&self) -> Option<&DstLabels> { + self.request.dst_labels() + } } diff --git a/proxy/src/ctx/mod.rs b/proxy/src/ctx/mod.rs index 5da01600c..9358dad86 100644 --- a/proxy/src/ctx/mod.rs +++ b/proxy/src/ctx/mod.rs @@ -78,7 +78,6 @@ impl Proxy { #[cfg(test)] pub mod test_util { use http; - use futures_watch; use std::{ fmt, net::SocketAddr, @@ -109,8 +108,8 @@ pub mod test_util { L: IntoIterator, S: fmt::Display, { - let (labels_watch, _store) = futures_watch::Watch::new(DstLabels::new(labels)); - ctx::transport::Client::new(&proxy, &addr(), Some(labels_watch)) + let labels = DstLabels::new(labels); + ctx::transport::Client::new(&proxy, &addr(), labels) } pub fn request( diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 47055c166..71cff25cf 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -2,8 +2,8 @@ use std::{cmp, hash}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use control::destination::DstLabelsWatch; use ctx; +use telemetry::metrics::DstLabels; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum Ctx { @@ -25,7 +25,7 @@ pub struct Server { pub struct Client { pub proxy: Arc, pub remote: SocketAddr, - pub dst_labels: Option, + pub dst_labels: Option, } impl Ctx { @@ -82,7 +82,7 @@ impl Client { pub fn new( proxy: &Arc, remote: &SocketAddr, - dst_labels: Option, + dst_labels: Option, ) -> Arc { let c = Client { proxy: Arc::clone(proxy), diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 503026698..7a44fe5e5 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -10,7 +10,6 @@ extern crate deflate; #[macro_use] extern crate futures; extern crate futures_mpsc_lossy; -extern crate futures_watch; extern crate h2; extern crate http; extern crate httparse; diff --git a/proxy/src/telemetry/metrics/labels.rs b/proxy/src/telemetry/metrics/labels.rs index ad5f12641..17a5f6b5b 100644 --- a/proxy/src/telemetry/metrics/labels.rs +++ b/proxy/src/telemetry/metrics/labels.rs @@ -85,8 +85,7 @@ impl RequestLabels { pub fn new(req: &ctx::http::Request) -> Self { let direction = Direction::from_context(req.server.proxy.as_ref()); - let outbound_labels = req.dst_labels() - .and_then(|b| b.borrow().clone()); + let outbound_labels = req.dst_labels().cloned(); let authority = req.uri .authority_part() diff --git a/proxy/src/telemetry/tap/match_.rs b/proxy/src/telemetry/tap/match_.rs index 27c397f9d..15e2746e5 100644 --- a/proxy/src/telemetry/tap/match_.rs +++ b/proxy/src/telemetry/tap/match_.rs @@ -109,12 +109,7 @@ impl Match { Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { match req.dst_labels() { None => false, - Some(ref b) => { - match b.borrow().as_ref() { - None => false, - Some(ref labels) => label.matches(labels.as_map()), - } - } + Some(labels) => label.matches(labels.as_map()), } } @@ -123,12 +118,7 @@ impl Match { Event::StreamResponseEnd(ref rsp, _) => { match rsp.request.dst_labels() { None => false, - Some(ref b) => { - match b.borrow().as_ref() { - None => false, - Some(ref labels) => label.matches(labels.as_map()), - } - } + Some(labels) => label.matches(labels.as_map()), } },