From 491fae7cc4b01aa5aaceb4e75a2fa84fe01d284b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 19 Apr 2018 11:01:10 -0700 Subject: [PATCH] proxy: Rewrite mock controller to accept a stream of dst updates (#808) Currently, the mock controller, which is used in tests, takes all of its updates a priori, which makes it hard to control when an update occurs within a test. Now, the controller exposes a `DstSender`, which wraps an unbounded channel of destination updates. This allows tests to trigger updates at a specific point in the test. In order to accomplish this, the controller's hand-rolled gRPC server implementation has been discarded in favor of a real gRPC destination service. This requires that the `controller-grpc` project now builds both clients and servers for the destination service. Additionally, we now build a tap client as well (assuming that we'll want to write tests against our tap server). --- proxy/controller-grpc/build.rs | 14 +- proxy/tests/discovery.rs | 78 ++++----- proxy/tests/support/controller.rs | 277 ++++++++++-------------------- proxy/tests/support/mod.rs | 7 +- proxy/tests/telemetry.rs | 141 ++++++++------- proxy/tests/transparency.rs | 2 +- 6 files changed, 208 insertions(+), 311 deletions(-) diff --git a/proxy/controller-grpc/build.rs b/proxy/controller-grpc/build.rs index 892120d28..eac393797 100644 --- a/proxy/controller-grpc/build.rs +++ b/proxy/controller-grpc/build.rs @@ -5,27 +5,21 @@ fn main() { } fn build_control() { - let client_files = &[ + let iface_files = &[ "../../proto/common/common.proto", "../../proto/proxy/destination/destination.proto", + "../../proto/proxy/tap/tap.proto" ]; - let server_files = &["../../proto/proxy/tap/tap.proto"]; let dirs = &["../../proto"]; tower_grpc_build::Config::new() .enable_client(true) - .enable_server(false) - .build(client_files, dirs) - .unwrap(); - - tower_grpc_build::Config::new() - .enable_client(false) .enable_server(true) - .build(server_files, dirs) + .build(iface_files, dirs) .unwrap(); // recompile protobufs only if any of the proto files changes. - for file in client_files.iter().chain(server_files) { + for file in iface_files { println!("cargo:rerun-if-changed={}", file); } } diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index 3948c1208..c446d15a0 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -10,10 +10,11 @@ macro_rules! generate_tests { fn outbound_asks_controller_api() { let _ = env_logger::try_init(); let srv = $make_server().route("/", "hello").route("/bye", "bye").run(); + let ctrl = controller::new() - .destination("disco.test.svc.cluster.local", srv.addr) - .run(); - let proxy = proxy::new().controller(ctrl).outbound(srv).run(); + .destination_and_close("disco.test.svc.cluster.local", srv.addr); + + let proxy = proxy::new().controller(ctrl.run()).outbound(srv).run(); let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local"); assert_eq!(client.get("/"), "hello"); @@ -25,11 +26,12 @@ macro_rules! generate_tests { let _ = env_logger::try_init(); let srv = $make_server().route("/recon", "nect").run(); + let ctrl = controller::new() .destination_close("disco.test.svc.cluster.local") - .destination("disco.test.svc.cluster.local", srv.addr) - .run(); - let proxy = proxy::new().controller(ctrl).outbound(srv).run(); + .destination_and_close("disco.test.svc.cluster.local", srv.addr); + + let proxy = proxy::new().controller(ctrl.run()).outbound(srv).run(); let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local"); assert_eq!(client.get("/recon"), "nect"); @@ -70,14 +72,15 @@ macro_rules! generate_tests { env.put(config::ENV_BIND_TIMEOUT, "100".to_owned()); let srv = $make_server().route("/", "hello").run(); - let ctrl = controller::new() - .destination("initially-exists.ns.svc.cluster.local", srv.addr) - .destination_close("trigger-close.ns.svc.cluster.local") - .destination_fn("initially-exists.ns.svc.cluster.local", f) - .run(); + let ctrl = controller::new(); + + let dst_tx0 = ctrl.destination_tx("initially-exists.ns.svc.cluster.local"); + dst_tx0.send_addr(srv.addr); + + let dst_tx1 = ctrl.destination_tx("initially-exists.ns.svc.cluster.local"); let proxy = proxy::new() - .controller(ctrl) + .controller(ctrl.run()) .outbound(srv) .run_with_test_env(env); @@ -85,15 +88,10 @@ macro_rules! generate_tests { $make_client(proxy.outbound, "initially-exists.ns.svc.cluster.local"); assert_eq!(initially_exists.get("/"), "hello"); - // Try to access a different server which will trigger the `destination_close()` - // above. - { - let trigger_close = - $make_client(proxy.outbound, "trigger-close.ns.svc.cluster.local"); - let mut req = trigger_close.request_builder("/"); - let rsp = trigger_close.request(req.method("GET")); - // the request should time out - assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR); + drop(dst_tx0); // trigger reconnect + match f() { + None => drop(dst_tx1), + Some(up) => dst_tx1.send(up), } // Wait for the reconnect to happen. TODO: Replace this flaky logic. @@ -109,9 +107,6 @@ macro_rules! generate_tests { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_times_out() { - use std::collections::HashMap; - use std::thread; - let _ = env_logger::try_init(); let mut env = config::TestEnv::new(); @@ -119,22 +114,13 @@ macro_rules! generate_tests { env.put(config::ENV_BIND_TIMEOUT, "100".to_owned()); let srv = $make_server().route("/hi", "hello").run(); - let addr = srv.addr.clone(); - let ctrl = controller::new() - // when the proxy requests the destination, sleep for 500 ms, and then - // return the correct destination - .destination_fn("disco.test.svc.cluster.local", move || { - thread::sleep(Duration::from_millis(500)); - Some(controller::destination_update( - addr, - HashMap::new(), - HashMap::new(), - )) - }) - .run(); + let ctrl = controller::new(); + + // when the proxy requests the destination, don't respond. + let _dst_tx = ctrl.destination_tx("disco.test.svc.cluster.local"); let proxy = proxy::new() - .controller(ctrl) + .controller(ctrl.run()) .outbound(srv) .run_with_test_env(env); @@ -153,11 +139,12 @@ macro_rules! generate_tests { .route("/", "hello") .route("/bye", "bye") .run(); + let ctrl = controller::new() - .destination("disco.test.svc.cluster.local", srv.addr) - .run(); + .destination_and_close("disco.test.svc.cluster.local", srv.addr); + let proxy = proxy::new() - .controller(ctrl) + .controller(ctrl.run()) // don't set srv as outbound(), so that SO_ORIGINAL_DST isn't // used as a backup .run(); @@ -201,10 +188,15 @@ fn outbound_updates_newer_services() { //TODO: when the support server can listen on both http1 and http2 //at the same time, do that here let srv = server::http1().route("/h1", "hello h1").run(); + let ctrl = controller::new() - .destination("disco.test.svc.cluster.local", srv.addr) + .destination_and_close("disco.test.svc.cluster.local", srv.addr); + + let proxy = proxy::new() + .controller(ctrl.run()) + .outbound(srv) .run(); - let proxy = proxy::new().controller(ctrl).outbound(srv).run(); + // the HTTP2 service starts watching first, receiving an addr // from the controller let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local"); diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 865394a4c..cc2bb7309 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -3,23 +3,27 @@ use support::*; use std::collections::{HashMap, VecDeque}; -use std::fmt; use std::net::IpAddr; use std::sync::{Arc, Mutex}; -use conduit_proxy_controller_grpc as pb; -use self::bytes::BufMut; -use self::prost::Message; +use conduit_proxy_controller_grpc::common::{self, Destination}; +use conduit_proxy_controller_grpc::destination as pb; pub fn new() -> Controller { Controller::new() } -struct Destination(Box Option + Send>); +pub type Labels = HashMap; #[derive(Debug)] +pub struct DstReceiver(sync::mpsc::UnboundedReceiver); + +#[derive(Clone, Debug)] +pub struct DstSender(sync::mpsc::UnboundedSender); + +#[derive(Clone, Debug, Default)] pub struct Controller { - destinations: VecDeque<(String, Destination)>, + expect_dst_calls: Arc>>, } pub struct Listening { @@ -29,43 +33,30 @@ pub struct Listening { impl Controller { pub fn new() -> Self { - Controller { - destinations: VecDeque::new(), - } + Self::default() } - pub fn destination(self, dest: &str, addr: SocketAddr) -> Self { - self.destination_fn(dest, move || Some(destination_update( - addr, - HashMap::new(), - HashMap::new(), - ))) + pub fn destination_tx(&self, dest: &str) -> DstSender { + let (tx, rx) = sync::mpsc::unbounded(); + let dst = common::Destination { + scheme: "k8s".into(), + path: dest.into(), + }; + self.expect_dst_calls + .lock() + .unwrap() + .push_back((dst, DstReceiver(rx))); + DstSender(tx) } - pub fn labeled_destination(self, dest: &str, addr: SocketAddr, - addr_labels: HashMap, - set_labels:HashMap) - -> Self - { - self.destination_fn(dest, move || Some(destination_update( - addr, - addr_labels.clone(), - set_labels.clone(), - ))) - } - - pub fn destination_fn(mut self, dest: &str, f: F) -> Self - where - F: Fn() -> Option + Send + 'static, - { - self.destinations - .push_back((dest.into(), Destination(Box::new(f)))); + pub fn destination_and_close(self, dest: &str, addr: SocketAddr) -> Self { + self.destination_tx(dest).send_addr(addr); self } - pub fn destination_close(self, dest: &str) -> Self { - self.destination_fn(dest, || None) + drop(self.destination_tx(dest)); + self } pub fn run(self) -> Listening { @@ -73,139 +64,44 @@ impl Controller { } } -type Response = self::http::Response; -type Destinations = Arc>>; - -const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get"; - -impl fmt::Debug for Destination { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Destination") +impl Stream for DstReceiver { + type Item = pb::Update; + type Error = grpc::Error; + fn poll(&mut self) -> Poll, Self::Error> { + self.0.poll().map_err(|_| grpc::Error::Grpc(grpc::Status::INTERNAL)) } } -#[derive(Debug)] -struct Svc { - destinations: Destinations, +impl DstSender { + pub fn send(&self, up: pb::Update) { + self.0.unbounded_send(up).expect("send dst update") + } + + pub fn send_addr(&self, addr: SocketAddr) { + self.send(destination_add(addr)) + } + + pub fn send_labeled(&self, addr: SocketAddr, addr_labels: Labels, parent_labels: Labels) { + self.send(destination_add_labeled(addr, addr_labels, parent_labels)); + } } -impl Svc { - fn route( - &self, - path: &str, - body: RecvBodyStream, - ) -> Box> { - let mut rsp = http::Response::builder(); - rsp.version(http::Version::HTTP_2); +impl pb::server::Destination for Controller { + type GetStream = DstReceiver; + type GetFuture = future::FutureResult, grpc::Error>; - match path { - DESTINATION_GET => { - let destinations = self.destinations.clone(); - Box::new(body.concat2().and_then(move |_bytes| { - let update = { - let next = { - let mut queue = destinations.lock().unwrap(); - queue.pop_front() - }; - // The test cases's entry may evaluate to `None` when it wants to close - // the connection. If there is no entry then that's equivalent to an - // implicit `destination_close()`. - // - // TODO: decode `_bytes` and compare with `_name` - next.and_then(|(_name, Destination(f))| f()) - .unwrap_or_default() - }; - let len = update.encoded_len(); - let mut buf = BytesMut::with_capacity(len + 5); - buf.put(0u8); - buf.put_u32::(len as u32); - update.encode(&mut buf).unwrap(); - let body = GrpcBody::new(buf.freeze()); - let rsp = rsp.body(body).unwrap(); - Ok(rsp) - })) - } - unknown => { - println!("unknown route: {:?}", unknown); - let body = GrpcBody::unimplemented(); - let rsp = rsp.body(body).unwrap(); - Box::new(future::ok(rsp)) + fn get(&mut self, req: grpc::Request) -> Self::GetFuture { + if let Ok(mut calls) = self.expect_dst_calls.lock() { + if let Some((dst, updates)) = calls.pop_front() { + if &dst == req.get_ref() { + return future::ok(grpc::Response::new(updates)); + } + + calls.push_front((dst, updates)); } } - } -} -impl Service for Svc { - type Request = Request; - type Response = Response; - type Error = h2::Error; - type Future = Box>; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let (head, body) = req.into_parts(); - self.route(head.uri.path(), RecvBodyStream(body)) - } -} - -struct GrpcBody { - message: Bytes, - status: &'static str, -} - -impl GrpcBody { - fn new(body: Bytes) -> Self { - GrpcBody { - message: body, - status: "0", - } - } - - fn unimplemented() -> Self { - GrpcBody { - message: Bytes::new(), - status: "12", - } - } -} - - -impl Body for GrpcBody { - type Data = Bytes; - - fn poll_data(&mut self) -> Poll, self::h2::Error> { - let data = self.message.split_off(0); - let data = if data.is_empty() { None } else { Some(data) }; - - Ok(Async::Ready(data)) - } - - fn poll_trailers(&mut self) -> Poll, self::h2::Error> { - let mut map = HeaderMap::new(); - map.insert("grpc-status", HeaderValue::from_static(self.status)); - Ok(Async::Ready(Some(map))) - } -} - -#[derive(Debug)] -struct NewSvc { - destinations: Destinations, -} -impl NewService for NewSvc { - type Request = Request; - type Response = Response; - type Error = h2::Error; - type InitError = ::std::io::Error; - type Service = Svc; - type Future = future::FutureResult; - - fn new_service(&self) -> Self::Future { - future::ok(Svc { - destinations: self.destinations.clone(), - }) + future::err(grpc::Error::Grpc(grpc::Status::INTERNAL)) } } @@ -219,10 +115,8 @@ fn run(controller: Controller) -> Listening { let mut core = Core::new().unwrap(); let reactor = core.handle(); - let factory = NewSvc { - destinations: Arc::new(Mutex::new(controller.destinations)), - }; - let h2 = tower_h2::Server::new(factory, Default::default(), reactor.clone()); + let new = pb::server::DestinationServer::new(controller); + let h2 = tower_h2::Server::new(new, Default::default(), reactor.clone()); let addr = ([127, 0, 0, 1], 0).into(); let bind = TcpListener::bind(&addr, &reactor).expect("bind"); @@ -259,17 +153,22 @@ fn run(controller: Controller) -> Listening { } } -pub fn destination_update(addr: SocketAddr, - set_labels: HashMap, - addr_labels: HashMap) - -> pb::destination::Update +pub fn destination_add(addr: SocketAddr) -> pb::Update { + destination_add_labeled(addr, HashMap::new(), HashMap::new()) +} + +pub fn destination_add_labeled( + addr: SocketAddr, + set_labels: HashMap, + addr_labels: HashMap) + -> pb::Update { - pb::destination::Update { - update: Some(pb::destination::update::Update::Add( - pb::destination::WeightedAddrSet { + pb::Update { + update: Some(pb::update::Update::Add( + pb::WeightedAddrSet { addrs: vec![ - pb::destination::WeightedAddr { - addr: Some(pb::common::TcpAddress { + pb::WeightedAddr { + addr: Some(common::TcpAddress { ip: Some(ip_conv(addr.ip())), port: u32::from(addr.port()), }), @@ -283,10 +182,10 @@ pub fn destination_update(addr: SocketAddr, } } -pub fn destination_add_none() -> pb::destination::Update { - pb::destination::Update { - update: Some(pb::destination::update::Update::Add( - pb::destination::WeightedAddrSet { +pub fn destination_add_none() -> pb::Update { + pb::Update { + update: Some(pb::update::Update::Add( + pb::WeightedAddrSet { addrs: Vec::new(), ..Default::default() }, @@ -294,10 +193,10 @@ pub fn destination_add_none() -> pb::destination::Update { } } -pub fn destination_remove_none() -> pb::destination::Update { - pb::destination::Update { - update: Some(pb::destination::update::Update::Remove( - pb::destination::AddrSet { +pub fn destination_remove_none() -> pb::Update { + pb::Update { + update: Some(pb::update::Update::Remove( + pb::AddrSet { addrs: Vec::new(), ..Default::default() }, @@ -305,23 +204,23 @@ pub fn destination_remove_none() -> pb::destination::Update { } } -pub fn destination_exists_with_no_endpoints() -> pb::destination::Update { - pb::destination::Update { - update: Some(pb::destination::update::Update::NoEndpoints ( - pb::destination::NoEndpoints { exists: true } +pub fn destination_exists_with_no_endpoints() -> pb::Update { + pb::Update { + update: Some(pb::update::Update::NoEndpoints( + pb::NoEndpoints { exists: true } )), } } -fn ip_conv(ip: IpAddr) -> pb::common::IpAddress { +fn ip_conv(ip: IpAddr) -> common::IpAddress { match ip { - IpAddr::V4(v4) => pb::common::IpAddress { - ip: Some(pb::common::ip_address::Ip::Ipv4(v4.into())), + IpAddr::V4(v4) => common::IpAddress { + ip: Some(common::ip_address::Ip::Ipv4(v4.into())), }, IpAddr::V6(v6) => { let (first, last) = octets_to_u64s(v6.octets()); - pb::common::IpAddress { - ip: Some(pb::common::ip_address::Ip::Ipv6(pb::common::IPv6 { + common::IpAddress { + ip: Some(common::ip_address::Ip::Ipv6(common::IPv6 { first, last, })), diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index 48e967cba..6bdc5f9e9 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -19,24 +19,25 @@ extern crate tokio_core; pub extern crate tokio_io; extern crate tower; extern crate tower_h2; +extern crate tower_grpc; extern crate log; pub extern crate env_logger; -use std::net::SocketAddr; +pub use std::collections::HashMap; +pub use std::net::SocketAddr; pub use std::time::Duration; -use self::bytes::{BigEndian, BytesMut}; pub use self::bytes::Bytes; pub use self::conduit_proxy::*; pub use self::futures::*; use self::futures::sync::oneshot; pub use self::http::{HeaderMap, Request, Response, StatusCode}; -use self::http::header::HeaderValue; use self::tokio_connect::Connect; use self::tokio_core::net::{TcpListener, TcpStream}; use self::tokio_core::reactor::{Core, Handle}; use self::tower::{NewService, Service}; use self::tower_h2::{Body, RecvBody}; +use self::tower_grpc as grpc; /// Environment variable for overriding the test patience. pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS"; diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index 42d9f075d..bf15956e6 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -50,7 +50,7 @@ impl Fixture { fn outbound_with_server(srv: server::Listening) -> Self { let ctrl = controller::new() - .destination("tele.test.svc.cluster.local", srv.addr) + .destination_and_close("tele.test.svc.cluster.local", srv.addr) .run(); let proxy = proxy::new() .controller(ctrl) @@ -400,37 +400,18 @@ fn metrics_endpoint_outbound_request_duration() { mod outbound_dst_labels { use super::support::*; use super::Fixture; + use controller::DstSender; - use std::collections::HashMap; - use std::iter::FromIterator; - - fn fixture(addr_labels: A, set_labels: B) -> Fixture - where - A: IntoIterator, - B: IntoIterator, - { - fixture_with_updates(vec![(addr_labels, set_labels)]) - } - - fn fixture_with_updates(updates: Vec<(A, B)>) -> Fixture - where - A: IntoIterator, - B: IntoIterator, - { + fn fixture(dest: &str) -> (Fixture, SocketAddr, DstSender) { info!("running test server"); let srv = server::new() .route("/", "hello") .run(); - let mut ctrl = controller::new(); - for (addr_labels, set_labels) in updates { - ctrl = ctrl.labeled_destination( - "labeled.test.svc.cluster.local", - srv.addr, - HashMap::from_iter(addr_labels), - HashMap::from_iter(set_labels), - ); - } + let addr = srv.addr; + + let ctrl = controller::new(); + let dst_tx = ctrl.destination_tx(dest); let proxy = proxy::new() .controller(ctrl.run()) @@ -440,22 +421,26 @@ mod outbound_dst_labels { let client = client::new( proxy.outbound, - "labeled.test.svc.cluster.local" + dest, ); - Fixture { client, metrics, proxy } + let f = Fixture { client, metrics, proxy }; + + (f, addr, dst_tx) } #[test] fn multiple_addr_labels() { let _ = env_logger::try_init(); - let Fixture { client, metrics, proxy: _proxy } = fixture ( - vec![ - (String::from("addr_label2"), String::from("bar")), - (String::from("addr_label1"), String::from("foo")), - ], - Vec::new(), - ); + let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = + fixture("labeled.test.svc.cluster.local"); + + { + let mut labels = HashMap::new(); + labels.insert("addr_label1".to_owned(), "foo".to_owned()); + labels.insert("addr_label2".to_owned(), "bar".to_owned()); + dst_tx.send_labeled(addr, labels, HashMap::new()); + } info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); @@ -471,13 +456,16 @@ mod outbound_dst_labels { #[test] fn multiple_addrset_labels() { let _ = env_logger::try_init(); - let Fixture { client, metrics, proxy: _proxy } = fixture ( - Vec::new(), - vec![ - (String::from("set_label1"), String::from("foo")), - (String::from("set_label2"), String::from("bar")), - ] - ); + let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = + fixture("labeled.test.svc.cluster.local"); + + { + let mut labels = HashMap::new(); + labels.insert("set_label1".to_owned(), "foo".to_owned()); + labels.insert("set_label2".to_owned(), "bar".to_owned()); + dst_tx.send_labeled(addr, HashMap::new(), labels); + } + info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); @@ -493,10 +481,16 @@ mod outbound_dst_labels { #[test] fn labeled_addr_and_addrset() { let _ = env_logger::try_init(); - let Fixture { client, metrics, proxy: _proxy } = fixture( - vec![(String::from("addr_label"), String::from("foo"))], - vec![(String::from("set_label"), String::from("bar"))], - ); + let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = + fixture("labeled.test.svc.cluster.local"); + + { + let mut alabels = HashMap::new(); + alabels.insert("addr_label".to_owned(), "foo".to_owned()); + let mut slabels = HashMap::new(); + slabels.insert("set_label".to_owned(), "bar".to_owned()); + dst_tx.send_labeled(addr, alabels, slabels); + } info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); @@ -519,19 +513,17 @@ mod outbound_dst_labels { fn controller_updates_addr_labels() { let _ = env_logger::try_init(); info!("running test server"); - let Fixture { client, metrics, proxy: _proxy } = - // the controller will update the value of `addr_label`. the value - // of `set_label` will remain unchanged throughout the test. - fixture_with_updates(vec![ - ( - vec![(String::from("addr_label"), String::from("foo"))], - vec![(String::from("set_label"), String::from("unchanged"))] - ), - ( - vec![(String::from("addr_label"), String::from("bar"))], - vec![(String::from("set_label"), String::from("unchanged"))] - ), - ]); + + let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = + fixture("labeled.test.svc.cluster.local"); + + { + let mut alabels = HashMap::new(); + alabels.insert("addr_label".to_owned(), "foo".to_owned()); + let mut slabels = HashMap::new(); + slabels.insert("set_label".to_owned(), "unchanged".to_owned()); + dst_tx.send_labeled(addr, alabels, slabels); + } info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); @@ -545,6 +537,14 @@ mod outbound_dst_labels { assert_contains!(metrics.get("/metrics"), "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + { + let mut alabels = HashMap::new(); + alabels.insert("addr_label".to_owned(), "bar".to_owned()); + let mut slabels = HashMap::new(); + slabels.insert("set_label".to_owned(), "unchanged".to_owned()); + dst_tx.send_labeled(addr, alabels, slabels); + } + info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); // the second request should increment stats labeled with `dst_addr_label="bar"` @@ -576,11 +576,15 @@ mod outbound_dst_labels { fn controller_updates_set_labels() { let _ = env_logger::try_init(); info!("running test server"); - let Fixture { client, metrics, proxy: _proxy } = - fixture_with_updates(vec![ - (vec![], vec![(String::from("set_label"), String::from("foo"))]), - (vec![], vec![(String::from("set_label"), String::from("bar"))]), - ]); + let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = + fixture("labeled.test.svc.cluster.local"); + + { + let alabels = HashMap::new(); + let mut slabels = HashMap::new(); + slabels.insert("set_label".to_owned(), "foo".to_owned()); + dst_tx.send_labeled(addr, alabels, slabels); + } info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); @@ -594,6 +598,13 @@ mod outbound_dst_labels { assert_contains!(metrics.get("/metrics"), "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1"); + { + let alabels = HashMap::new(); + let mut slabels = HashMap::new(); + slabels.insert("set_label".to_owned(), "bar".to_owned()); + dst_tx.send_labeled(addr, alabels, slabels); + } + info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); // the second request should increment stats labeled with `dst_addr_label="bar"` @@ -627,7 +638,7 @@ fn metrics_have_no_double_commas() { let outbound_srv = server::new().route("/hey", "hello").run(); let ctrl = controller::new() - .destination("tele.test.svc.cluster.local", outbound_srv.addr) + .destination_and_close("tele.test.svc.cluster.local", outbound_srv.addr) .run(); let proxy = proxy::new() .controller(ctrl) diff --git a/proxy/tests/transparency.rs b/proxy/tests/transparency.rs index 461a95adc..4aa4ee603 100644 --- a/proxy/tests/transparency.rs +++ b/proxy/tests/transparency.rs @@ -8,7 +8,7 @@ fn outbound_http1() { let srv = server::http1().route("/", "hello h1").run(); let ctrl = controller::new() - .destination("transparency.test.svc.cluster.local", srv.addr) + .destination_and_close("transparency.test.svc.cluster.local", srv.addr) .run(); let proxy = proxy::new().controller(ctrl).outbound(srv).run(); let client = client::http1(proxy.outbound, "transparency.test.svc.cluster.local");