proxy: Remove dynamic label updating on bound services (#1006)

Depends on tower-rs/tower#75. Required for #386

In order for the proxy to use the TLS support metadata from the Destination 
service correctly, we determined that the code for dynamically changing the
labels on an already-bound service should be removed, and any change in
metadata should cause an endpoint to be rebound.

I've modified the proxy so that we no longer update the labels using 
`futures-watch` (as a sidenote, we no longer depend on that crate). Metadata
update events now cause the `tower-discover::Discover` implementation for 
`DestinationSet` to re-insert the changed endpoint into the load balancer.
Upstream PR tower-rs/tower#75 in tower-balance changes the load balancer 
to honor duplicate insertions by replacing the old endpoint rather than 
ignoring them; that change is necessary for the tests to pass on this branch.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2018-05-29 12:48:59 -07:00 committed by GitHub
parent 84781c9c74
commit b3170af567
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 51 additions and 133 deletions

25
Cargo.lock generated
View File

@ -128,7 +128,6 @@ dependencies = [
"flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-mpsc-lossy 0.3.0",
"futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)",
"h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -349,15 +348,6 @@ dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-watch"
version = "0.1.0"
source = "git+https://github.com/carllerche/better-future.git#07baa13e91fefe7a51533dfde7b4e69e109ebe14"
dependencies = [
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "gzip-header"
version = "0.1.2"
@ -1074,7 +1064,7 @@ dependencies = [
[[package]]
name = "tower-balance"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1087,7 +1077,7 @@ dependencies = [
[[package]]
name = "tower-buffer"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1096,7 +1086,7 @@ dependencies = [
[[package]]
name = "tower-discover"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1146,7 +1136,7 @@ dependencies = [
[[package]]
name = "tower-in-flight-limit"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1155,7 +1145,7 @@ dependencies = [
[[package]]
name = "tower-reconnect"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1165,7 +1155,7 @@ dependencies = [
[[package]]
name = "tower-service"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1173,7 +1163,7 @@ dependencies = [
[[package]]
name = "tower-util"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#6cdc8d0ab594f8c8c06e74f78e2ee37abd3fd8bb"
source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1393,7 +1383,6 @@ dependencies = [
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c"
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)" = "<none>"
"checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40"
"checksum h2 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "01f3fc6ec9a3437d52fdb95000b8aeec314842b47529631e3b5570597c6ffad7"
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"

View File

@ -19,7 +19,6 @@ bytes = "0.4"
deflate = {version = "0.7.18", features = ["gzip"] }
env_logger = { version = "0.5", default-features = false }
futures = "0.1"
futures-watch = { git = "https://github.com/carllerche/better-future.git" }
h2 = "0.1.7"
http = "0.1"
httparse = "1.2"

View File

@ -183,7 +183,7 @@ where
// them onto the new watch first
match set.addrs {
Exists::Yes(ref cache) => for (&addr, meta) in cache {
let update = Update::Insert(addr, meta.clone());
let update = Update::Bind(addr, meta.clone());
resolve.responder.update_tx
.unbounded_send(update)
.expect("unbounded_send does not fail");
@ -526,12 +526,12 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
) {
let (update_str, update, addr) = match change {
CacheChange::Insertion { key, value } => {
("insert", Update::Insert(key, value.clone()), key)
("insert", Update::Bind(key, value.clone()), key)
},
CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
CacheChange::Modification { key, new_value } => (
"change metadata for",
Update::ChangeMetadata(key, new_value.clone()),
Update::Bind(key, new_value.clone()),
key,
),
};

View File

@ -1,26 +1,24 @@
use futures_watch;
use std::{cmp, hash, net::SocketAddr};
use std::net::SocketAddr;
use telemetry::metrics::DstLabels;
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
/// An individual traffic target.
///
/// Equality, Ordering, and hashability is determined soley by the Endpoint's address.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Endpoint {
address: SocketAddr,
dst_labels: Option<DstLabelsWatch>,
dst_labels: Option<DstLabels>,
}
// ==== impl Endpoint =====
impl Endpoint {
pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self {
pub fn new(address: SocketAddr, dst_labels: Option<DstLabels>) -> Self {
Self {
address,
dst_labels: Some(dst_labels),
dst_labels,
}
}
@ -28,7 +26,7 @@ impl Endpoint {
self.address
}
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
pub fn dst_labels(&self) -> Option<&DstLabels> {
self.dst_labels.as_ref()
}
}
@ -41,17 +39,3 @@ impl From<SocketAddr> for Endpoint {
}
}
}
impl hash::Hash for Endpoint {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.address.hash(state)
}
}
impl cmp::PartialEq for Endpoint {
fn eq(&self, other: &Self) -> bool {
self.address.eq(&other.address)
}
}
impl cmp::Eq for Endpoint {}

View File

@ -24,7 +24,6 @@
//! - We need some means to limit the number of endpoints that can be returned for a
//! single resolution so that `control::Cache` is not effectively unbounded.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
@ -35,7 +34,6 @@ use futures::{
Poll,
Stream
};
use futures_watch::{Store, Watch};
use http;
use tower_discover::{Change, Discover};
use tower_service::Service;
@ -47,7 +45,7 @@ use transport::{DnsNameAndPort, HostAndPort};
pub mod background;
mod endpoint;
pub use self::endpoint::{DstLabelsWatch, Endpoint};
pub use self::endpoint::Endpoint;
/// A handle to request resolutions from the background discovery task.
#[derive(Clone, Debug)]
@ -84,18 +82,11 @@ pub struct Resolution<B> {
/// reference has been dropped.
_active: Arc<()>,
/// Map associating addresses with the `Store` for the watch on that
/// service's metric labels (as provided by the Destination service).
///
/// This is used to update the `Labeled` middleware on those services
/// without requiring the service stack to be re-bound.
metric_labels: HashMap<SocketAddr, Store<Option<DstLabels>>>,
/// Binds an update endpoint to a Service.
bind: B,
}
/// .
/// Metadata describing an endpoint.
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
struct Metadata {
/// A set of Prometheus metric labels describing the destination.
@ -104,9 +95,14 @@ struct Metadata {
#[derive(Debug, Clone)]
enum Update {
Insert(SocketAddr, Metadata),
/// Indicates that an endpoint should be bound to `SocketAddr` with the
/// provided `Metadata`.
///
/// If there was already an endpoint in the load balancer for this
/// address, it should be replaced with the new one.
Bind(SocketAddr, Metadata),
/// Indicates that the endpoint for this `SocketAddr` should be removed.
Remove(SocketAddr),
ChangeMetadata(SocketAddr, Metadata),
}
/// Bind a `SocketAddr` with a protocol.
@ -178,7 +174,6 @@ impl Resolver {
Resolution {
update_rx,
_active: active,
metric_labels: HashMap::new(),
bind,
}
}
@ -186,29 +181,6 @@ impl Resolver {
// ==== impl Resolution =====
impl<B> Resolution<B> {
fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> {
if let Some(store) = self.metric_labels.get_mut(&addr) {
store
.store(meta.metric_labels)
.map_err(|e| {
error!("update_metadata: label store error: {:?}", e);
})
.map(|_| ())
} else {
// The store has already been removed, so nobody cares about
// the metadata change. We expect that this shouldn't happen,
// but if it does, log a warning and handle it gracefully.
warn!(
"update_metadata: ignoring ChangeMetadata for {:?} because the service no longer \
exists.",
addr
);
Ok(())
}
}
}
impl<B, A> Discover for Resolution<B>
where
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
@ -227,29 +199,18 @@ where
let update = try_ready!(up).expect("destination stream must be infinite");
match update {
Update::Insert(addr, meta) => {
// Construct a watch for the `Labeled` middleware that will
// wrap the bound service, and insert the store into our map
// so it can be updated later.
let (labels_watch, labels_store) = Watch::new(meta.metric_labels);
self.metric_labels.insert(addr, labels_store);
let endpoint = Endpoint::new(addr, labels_watch.clone());
Update::Bind(addr, meta) => {
// We expect the load balancer to handle duplicate inserts
// by replacing the old endpoint with the new one, so
// insertions of new endpoints and metadata changes for
// existing ones can be handled in the same way.
let endpoint = Endpoint::new(addr, meta.metric_labels.clone());
let service = self.bind.bind(&endpoint).map_err(|_| ())?;
return Ok(Async::Ready(Change::Insert(addr, service)));
},
Update::ChangeMetadata(addr, meta) => {
// Update metadata and continue polling `rx`.
self.update_metadata(addr, meta)?;
},
Update::Remove(addr) => {
// It's safe to drop the store handle here, even if
// the `Labeled` middleware using the watch handle
// still exists --- it will simply read the final
// value from the watch.
self.metric_labels.remove(&addr);
return Ok(Async::Ready(Change::Remove(addr)));
},
}

View File

@ -46,8 +46,7 @@ impl event::StreamResponseEnd {
};
let destination_meta = ctx.dst_labels()
.and_then(|b| b.borrow().clone())
.map(|d| tap_event::EndpointMeta {
.map(|ref d| tap_event::EndpointMeta {
labels: d.as_map().clone(),
});
@ -78,8 +77,7 @@ impl event::StreamResponseFail {
};
let destination_meta = ctx.dst_labels()
.and_then(|b| b.borrow().clone())
.map(|d| tap_event::EndpointMeta {
.map(|ref d| tap_event::EndpointMeta {
labels: d.as_map().clone(),
});
@ -110,8 +108,7 @@ impl event::StreamRequestFail {
};
let destination_meta = ctx.dst_labels()
.and_then(|b| b.borrow().clone())
.map(|d| tap_event::EndpointMeta {
.map(|ref d| tap_event::EndpointMeta {
labels: d.as_map().clone(),
});
@ -150,8 +147,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
};
let destination_meta = ctx.dst_labels()
.and_then(|b| b.borrow().clone())
.map(|d| tap_event::EndpointMeta {
.map(|ref d| tap_event::EndpointMeta {
labels: d.as_map().clone(),
});
@ -176,9 +172,8 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
http_status: u32::from(ctx.status.as_u16()),
};
let destination_meta = ctx.request.dst_labels()
.and_then(|b| b.borrow().clone())
.map(|d| tap_event::EndpointMeta {
let destination_meta = ctx.dst_labels()
.map(|ref d| tap_event::EndpointMeta {
labels: d.as_map().clone(),
});

View File

@ -1,9 +1,8 @@
use http;
use std::sync::Arc;
use control::destination::DstLabelsWatch;
use ctx;
use telemetry::metrics::DstLabels;
/// Describes a stream's request headers.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -54,7 +53,7 @@ impl Request {
Arc::new(r)
}
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
pub fn dst_labels(&self) -> Option<&DstLabels> {
self.client.dst_labels.as_ref()
}
}
@ -68,4 +67,8 @@ impl Response {
Arc::new(r)
}
pub fn dst_labels(&self) -> Option<&DstLabels> {
self.request.dst_labels()
}
}

View File

@ -78,7 +78,6 @@ impl Proxy {
#[cfg(test)]
pub mod test_util {
use http;
use futures_watch;
use std::{
fmt,
net::SocketAddr,
@ -109,8 +108,8 @@ pub mod test_util {
L: IntoIterator<Item=(S, S)>,
S: fmt::Display,
{
let (labels_watch, _store) = futures_watch::Watch::new(DstLabels::new(labels));
ctx::transport::Client::new(&proxy, &addr(), Some(labels_watch))
let labels = DstLabels::new(labels);
ctx::transport::Client::new(&proxy, &addr(), labels)
}
pub fn request(

View File

@ -2,8 +2,8 @@ use std::{cmp, hash};
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use control::destination::DstLabelsWatch;
use ctx;
use telemetry::metrics::DstLabels;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Ctx {
@ -25,7 +25,7 @@ pub struct Server {
pub struct Client {
pub proxy: Arc<ctx::Proxy>,
pub remote: SocketAddr,
pub dst_labels: Option<DstLabelsWatch>,
pub dst_labels: Option<DstLabels>,
}
impl Ctx {
@ -82,7 +82,7 @@ impl Client {
pub fn new(
proxy: &Arc<ctx::Proxy>,
remote: &SocketAddr,
dst_labels: Option<DstLabelsWatch>,
dst_labels: Option<DstLabels>,
) -> Arc<Client> {
let c = Client {
proxy: Arc::clone(proxy),

View File

@ -10,7 +10,6 @@ extern crate deflate;
#[macro_use]
extern crate futures;
extern crate futures_mpsc_lossy;
extern crate futures_watch;
extern crate h2;
extern crate http;
extern crate httparse;

View File

@ -85,8 +85,7 @@ impl RequestLabels {
pub fn new(req: &ctx::http::Request) -> Self {
let direction = Direction::from_context(req.server.proxy.as_ref());
let outbound_labels = req.dst_labels()
.and_then(|b| b.borrow().clone());
let outbound_labels = req.dst_labels().cloned();
let authority = req.uri
.authority_part()

View File

@ -109,12 +109,7 @@ impl Match {
Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => {
match req.dst_labels() {
None => false,
Some(ref b) => {
match b.borrow().as_ref() {
None => false,
Some(ref labels) => label.matches(labels.as_map()),
}
}
Some(labels) => label.matches(labels.as_map()),
}
}
@ -123,12 +118,7 @@ impl Match {
Event::StreamResponseEnd(ref rsp, _) => {
match rsp.request.dst_labels() {
None => false,
Some(ref b) => {
match b.borrow().as_ref() {
None => false,
Some(ref labels) => label.matches(labels.as_map()),
}
}
Some(labels) => label.matches(labels.as_map()),
}
},