proxy: Rewrite mock controller to accept a stream of dst updates (#808)

Currently, the mock controller, which is used in tests, takes all of its
updates a priori, which makes it hard to control when an update occurs within a
test.

Now, the controller exposes a `DstSender`, which wraps an unbounded channel of
destination updates. This allows tests to trigger updates at a specific point
in the test.

In order to accomplish this, the controller's hand-rolled gRPC server
implementation has been discarded in favor of a real gRPC destination service.
This requires that the `controller-grpc` project now builds both clients
and servers for the destination service. Additionally, we now build a tap
client as well (assuming that we'll want to write tests against our tap
server).
This commit is contained in:
Oliver Gould 2018-04-19 11:01:10 -07:00 committed by GitHub
parent 926c4cf323
commit 491fae7cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 208 additions and 311 deletions

View File

@ -5,27 +5,21 @@ fn main() {
} }
fn build_control() { fn build_control() {
let client_files = &[ let iface_files = &[
"../../proto/common/common.proto", "../../proto/common/common.proto",
"../../proto/proxy/destination/destination.proto", "../../proto/proxy/destination/destination.proto",
"../../proto/proxy/tap/tap.proto"
]; ];
let server_files = &["../../proto/proxy/tap/tap.proto"];
let dirs = &["../../proto"]; let dirs = &["../../proto"];
tower_grpc_build::Config::new() tower_grpc_build::Config::new()
.enable_client(true) .enable_client(true)
.enable_server(false)
.build(client_files, dirs)
.unwrap();
tower_grpc_build::Config::new()
.enable_client(false)
.enable_server(true) .enable_server(true)
.build(server_files, dirs) .build(iface_files, dirs)
.unwrap(); .unwrap();
// recompile protobufs only if any of the proto files changes. // recompile protobufs only if any of the proto files changes.
for file in client_files.iter().chain(server_files) { for file in iface_files {
println!("cargo:rerun-if-changed={}", file); println!("cargo:rerun-if-changed={}", file);
} }
} }

View File

@ -10,10 +10,11 @@ macro_rules! generate_tests {
fn outbound_asks_controller_api() { fn outbound_asks_controller_api() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let srv = $make_server().route("/", "hello").route("/bye", "bye").run(); let srv = $make_server().route("/", "hello").route("/bye", "bye").run();
let ctrl = controller::new() let ctrl = controller::new()
.destination("disco.test.svc.cluster.local", srv.addr) .destination_and_close("disco.test.svc.cluster.local", srv.addr);
.run();
let proxy = proxy::new().controller(ctrl).outbound(srv).run(); let proxy = proxy::new().controller(ctrl.run()).outbound(srv).run();
let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local"); let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
@ -25,11 +26,12 @@ macro_rules! generate_tests {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let srv = $make_server().route("/recon", "nect").run(); let srv = $make_server().route("/recon", "nect").run();
let ctrl = controller::new() let ctrl = controller::new()
.destination_close("disco.test.svc.cluster.local") .destination_close("disco.test.svc.cluster.local")
.destination("disco.test.svc.cluster.local", srv.addr) .destination_and_close("disco.test.svc.cluster.local", srv.addr);
.run();
let proxy = proxy::new().controller(ctrl).outbound(srv).run(); let proxy = proxy::new().controller(ctrl.run()).outbound(srv).run();
let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local"); let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local");
assert_eq!(client.get("/recon"), "nect"); assert_eq!(client.get("/recon"), "nect");
@ -70,14 +72,15 @@ macro_rules! generate_tests {
env.put(config::ENV_BIND_TIMEOUT, "100".to_owned()); env.put(config::ENV_BIND_TIMEOUT, "100".to_owned());
let srv = $make_server().route("/", "hello").run(); let srv = $make_server().route("/", "hello").run();
let ctrl = controller::new() let ctrl = controller::new();
.destination("initially-exists.ns.svc.cluster.local", srv.addr)
.destination_close("trigger-close.ns.svc.cluster.local") let dst_tx0 = ctrl.destination_tx("initially-exists.ns.svc.cluster.local");
.destination_fn("initially-exists.ns.svc.cluster.local", f) dst_tx0.send_addr(srv.addr);
.run();
let dst_tx1 = ctrl.destination_tx("initially-exists.ns.svc.cluster.local");
let proxy = proxy::new() let proxy = proxy::new()
.controller(ctrl) .controller(ctrl.run())
.outbound(srv) .outbound(srv)
.run_with_test_env(env); .run_with_test_env(env);
@ -85,15 +88,10 @@ macro_rules! generate_tests {
$make_client(proxy.outbound, "initially-exists.ns.svc.cluster.local"); $make_client(proxy.outbound, "initially-exists.ns.svc.cluster.local");
assert_eq!(initially_exists.get("/"), "hello"); assert_eq!(initially_exists.get("/"), "hello");
// Try to access a different server which will trigger the `destination_close()` drop(dst_tx0); // trigger reconnect
// above. match f() {
{ None => drop(dst_tx1),
let trigger_close = Some(up) => dst_tx1.send(up),
$make_client(proxy.outbound, "trigger-close.ns.svc.cluster.local");
let mut req = trigger_close.request_builder("/");
let rsp = trigger_close.request(req.method("GET"));
// the request should time out
assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
} }
// Wait for the reconnect to happen. TODO: Replace this flaky logic. // Wait for the reconnect to happen. TODO: Replace this flaky logic.
@ -109,9 +107,6 @@ macro_rules! generate_tests {
#[test] #[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)] #[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_times_out() { fn outbound_times_out() {
use std::collections::HashMap;
use std::thread;
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let mut env = config::TestEnv::new(); let mut env = config::TestEnv::new();
@ -119,22 +114,13 @@ macro_rules! generate_tests {
env.put(config::ENV_BIND_TIMEOUT, "100".to_owned()); env.put(config::ENV_BIND_TIMEOUT, "100".to_owned());
let srv = $make_server().route("/hi", "hello").run(); let srv = $make_server().route("/hi", "hello").run();
let addr = srv.addr.clone(); let ctrl = controller::new();
let ctrl = controller::new()
// when the proxy requests the destination, sleep for 500 ms, and then // when the proxy requests the destination, don't respond.
// return the correct destination let _dst_tx = ctrl.destination_tx("disco.test.svc.cluster.local");
.destination_fn("disco.test.svc.cluster.local", move || {
thread::sleep(Duration::from_millis(500));
Some(controller::destination_update(
addr,
HashMap::new(),
HashMap::new(),
))
})
.run();
let proxy = proxy::new() let proxy = proxy::new()
.controller(ctrl) .controller(ctrl.run())
.outbound(srv) .outbound(srv)
.run_with_test_env(env); .run_with_test_env(env);
@ -153,11 +139,12 @@ macro_rules! generate_tests {
.route("/", "hello") .route("/", "hello")
.route("/bye", "bye") .route("/bye", "bye")
.run(); .run();
let ctrl = controller::new() let ctrl = controller::new()
.destination("disco.test.svc.cluster.local", srv.addr) .destination_and_close("disco.test.svc.cluster.local", srv.addr);
.run();
let proxy = proxy::new() let proxy = proxy::new()
.controller(ctrl) .controller(ctrl.run())
// don't set srv as outbound(), so that SO_ORIGINAL_DST isn't // don't set srv as outbound(), so that SO_ORIGINAL_DST isn't
// used as a backup // used as a backup
.run(); .run();
@ -201,10 +188,15 @@ fn outbound_updates_newer_services() {
//TODO: when the support server can listen on both http1 and http2 //TODO: when the support server can listen on both http1 and http2
//at the same time, do that here //at the same time, do that here
let srv = server::http1().route("/h1", "hello h1").run(); let srv = server::http1().route("/h1", "hello h1").run();
let ctrl = controller::new() let ctrl = controller::new()
.destination("disco.test.svc.cluster.local", srv.addr) .destination_and_close("disco.test.svc.cluster.local", srv.addr);
let proxy = proxy::new()
.controller(ctrl.run())
.outbound(srv)
.run(); .run();
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
// the HTTP2 service starts watching first, receiving an addr // the HTTP2 service starts watching first, receiving an addr
// from the controller // from the controller
let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local"); let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local");

View File

@ -3,23 +3,27 @@
use support::*; use support::*;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::net::IpAddr; use std::net::IpAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use conduit_proxy_controller_grpc as pb; use conduit_proxy_controller_grpc::common::{self, Destination};
use self::bytes::BufMut; use conduit_proxy_controller_grpc::destination as pb;
use self::prost::Message;
pub fn new() -> Controller { pub fn new() -> Controller {
Controller::new() Controller::new()
} }
struct Destination(Box<Fn() -> Option<pb::destination::Update> + Send>); pub type Labels = HashMap<String, String>;
#[derive(Debug)] #[derive(Debug)]
pub struct DstReceiver(sync::mpsc::UnboundedReceiver<pb::Update>);
#[derive(Clone, Debug)]
pub struct DstSender(sync::mpsc::UnboundedSender<pb::Update>);
#[derive(Clone, Debug, Default)]
pub struct Controller { pub struct Controller {
destinations: VecDeque<(String, Destination)>, expect_dst_calls: Arc<Mutex<VecDeque<(Destination, DstReceiver)>>>,
} }
pub struct Listening { pub struct Listening {
@ -29,43 +33,30 @@ pub struct Listening {
impl Controller { impl Controller {
pub fn new() -> Self { pub fn new() -> Self {
Controller { Self::default()
destinations: VecDeque::new(),
}
} }
pub fn destination(self, dest: &str, addr: SocketAddr) -> Self { pub fn destination_tx(&self, dest: &str) -> DstSender {
self.destination_fn(dest, move || Some(destination_update( let (tx, rx) = sync::mpsc::unbounded();
addr, let dst = common::Destination {
HashMap::new(), scheme: "k8s".into(),
HashMap::new(), path: dest.into(),
))) };
self.expect_dst_calls
.lock()
.unwrap()
.push_back((dst, DstReceiver(rx)));
DstSender(tx)
} }
pub fn labeled_destination(self, dest: &str, addr: SocketAddr, pub fn destination_and_close(self, dest: &str, addr: SocketAddr) -> Self {
addr_labels: HashMap<String, String>, self.destination_tx(dest).send_addr(addr);
set_labels:HashMap<String, String>)
-> Self
{
self.destination_fn(dest, move || Some(destination_update(
addr,
addr_labels.clone(),
set_labels.clone(),
)))
}
pub fn destination_fn<F>(mut self, dest: &str, f: F) -> Self
where
F: Fn() -> Option<pb::destination::Update> + Send + 'static,
{
self.destinations
.push_back((dest.into(), Destination(Box::new(f))));
self self
} }
pub fn destination_close(self, dest: &str) -> Self { pub fn destination_close(self, dest: &str) -> Self {
self.destination_fn(dest, || None) drop(self.destination_tx(dest));
self
} }
pub fn run(self) -> Listening { pub fn run(self) -> Listening {
@ -73,139 +64,44 @@ impl Controller {
} }
} }
type Response = self::http::Response<GrpcBody>; impl Stream for DstReceiver {
type Destinations = Arc<Mutex<VecDeque<(String, Destination)>>>; type Item = pb::Update;
type Error = grpc::Error;
const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get"; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll().map_err(|_| grpc::Error::Grpc(grpc::Status::INTERNAL))
impl fmt::Debug for Destination {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Destination")
} }
} }
#[derive(Debug)] impl DstSender {
struct Svc { pub fn send(&self, up: pb::Update) {
destinations: Destinations, self.0.unbounded_send(up).expect("send dst update")
} }
impl Svc { pub fn send_addr(&self, addr: SocketAddr) {
fn route( self.send(destination_add(addr))
&self, }
path: &str,
body: RecvBodyStream,
) -> Box<Future<Item = Response, Error = h2::Error>> {
let mut rsp = http::Response::builder();
rsp.version(http::Version::HTTP_2);
match path { pub fn send_labeled(&self, addr: SocketAddr, addr_labels: Labels, parent_labels: Labels) {
DESTINATION_GET => { self.send(destination_add_labeled(addr, addr_labels, parent_labels));
let destinations = self.destinations.clone();
Box::new(body.concat2().and_then(move |_bytes| {
let update = {
let next = {
let mut queue = destinations.lock().unwrap();
queue.pop_front()
};
// The test cases's entry may evaluate to `None` when it wants to close
// the connection. If there is no entry then that's equivalent to an
// implicit `destination_close()`.
//
// TODO: decode `_bytes` and compare with `_name`
next.and_then(|(_name, Destination(f))| f())
.unwrap_or_default()
};
let len = update.encoded_len();
let mut buf = BytesMut::with_capacity(len + 5);
buf.put(0u8);
buf.put_u32::<BigEndian>(len as u32);
update.encode(&mut buf).unwrap();
let body = GrpcBody::new(buf.freeze());
let rsp = rsp.body(body).unwrap();
Ok(rsp)
}))
}
unknown => {
println!("unknown route: {:?}", unknown);
let body = GrpcBody::unimplemented();
let rsp = rsp.body(body).unwrap();
Box::new(future::ok(rsp))
}
}
} }
} }
impl Service for Svc { impl pb::server::Destination for Controller {
type Request = Request<RecvBody>; type GetStream = DstReceiver;
type Response = Response; type GetFuture = future::FutureResult<grpc::Response<Self::GetStream>, grpc::Error>;
type Error = h2::Error;
type Future = Box<Future<Item = Response, Error = h2::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn get(&mut self, req: grpc::Request<Destination>) -> Self::GetFuture {
Ok(Async::Ready(())) if let Ok(mut calls) = self.expect_dst_calls.lock() {
if let Some((dst, updates)) = calls.pop_front() {
if &dst == req.get_ref() {
return future::ok(grpc::Response::new(updates));
} }
fn call(&mut self, req: Request<RecvBody>) -> Self::Future { calls.push_front((dst, updates));
let (head, body) = req.into_parts();
self.route(head.uri.path(), RecvBodyStream(body))
}
}
struct GrpcBody {
message: Bytes,
status: &'static str,
}
impl GrpcBody {
fn new(body: Bytes) -> Self {
GrpcBody {
message: body,
status: "0",
} }
} }
fn unimplemented() -> Self { future::err(grpc::Error::Grpc(grpc::Status::INTERNAL))
GrpcBody {
message: Bytes::new(),
status: "12",
}
}
}
impl Body for GrpcBody {
type Data = Bytes;
fn poll_data(&mut self) -> Poll<Option<Bytes>, self::h2::Error> {
let data = self.message.split_off(0);
let data = if data.is_empty() { None } else { Some(data) };
Ok(Async::Ready(data))
}
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, self::h2::Error> {
let mut map = HeaderMap::new();
map.insert("grpc-status", HeaderValue::from_static(self.status));
Ok(Async::Ready(Some(map)))
}
}
#[derive(Debug)]
struct NewSvc {
destinations: Destinations,
}
impl NewService for NewSvc {
type Request = Request<RecvBody>;
type Response = Response;
type Error = h2::Error;
type InitError = ::std::io::Error;
type Service = Svc;
type Future = future::FutureResult<Svc, Self::InitError>;
fn new_service(&self) -> Self::Future {
future::ok(Svc {
destinations: self.destinations.clone(),
})
} }
} }
@ -219,10 +115,8 @@ fn run(controller: Controller) -> Listening {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let reactor = core.handle(); let reactor = core.handle();
let factory = NewSvc { let new = pb::server::DestinationServer::new(controller);
destinations: Arc::new(Mutex::new(controller.destinations)), let h2 = tower_h2::Server::new(new, Default::default(), reactor.clone());
};
let h2 = tower_h2::Server::new(factory, Default::default(), reactor.clone());
let addr = ([127, 0, 0, 1], 0).into(); let addr = ([127, 0, 0, 1], 0).into();
let bind = TcpListener::bind(&addr, &reactor).expect("bind"); let bind = TcpListener::bind(&addr, &reactor).expect("bind");
@ -259,17 +153,22 @@ fn run(controller: Controller) -> Listening {
} }
} }
pub fn destination_update(addr: SocketAddr, pub fn destination_add(addr: SocketAddr) -> pb::Update {
destination_add_labeled(addr, HashMap::new(), HashMap::new())
}
pub fn destination_add_labeled(
addr: SocketAddr,
set_labels: HashMap<String, String>, set_labels: HashMap<String, String>,
addr_labels: HashMap<String, String>) addr_labels: HashMap<String, String>)
-> pb::destination::Update -> pb::Update
{ {
pb::destination::Update { pb::Update {
update: Some(pb::destination::update::Update::Add( update: Some(pb::update::Update::Add(
pb::destination::WeightedAddrSet { pb::WeightedAddrSet {
addrs: vec![ addrs: vec![
pb::destination::WeightedAddr { pb::WeightedAddr {
addr: Some(pb::common::TcpAddress { addr: Some(common::TcpAddress {
ip: Some(ip_conv(addr.ip())), ip: Some(ip_conv(addr.ip())),
port: u32::from(addr.port()), port: u32::from(addr.port()),
}), }),
@ -283,10 +182,10 @@ pub fn destination_update(addr: SocketAddr,
} }
} }
pub fn destination_add_none() -> pb::destination::Update { pub fn destination_add_none() -> pb::Update {
pb::destination::Update { pb::Update {
update: Some(pb::destination::update::Update::Add( update: Some(pb::update::Update::Add(
pb::destination::WeightedAddrSet { pb::WeightedAddrSet {
addrs: Vec::new(), addrs: Vec::new(),
..Default::default() ..Default::default()
}, },
@ -294,10 +193,10 @@ pub fn destination_add_none() -> pb::destination::Update {
} }
} }
pub fn destination_remove_none() -> pb::destination::Update { pub fn destination_remove_none() -> pb::Update {
pb::destination::Update { pb::Update {
update: Some(pb::destination::update::Update::Remove( update: Some(pb::update::Update::Remove(
pb::destination::AddrSet { pb::AddrSet {
addrs: Vec::new(), addrs: Vec::new(),
..Default::default() ..Default::default()
}, },
@ -305,23 +204,23 @@ pub fn destination_remove_none() -> pb::destination::Update {
} }
} }
pub fn destination_exists_with_no_endpoints() -> pb::destination::Update { pub fn destination_exists_with_no_endpoints() -> pb::Update {
pb::destination::Update { pb::Update {
update: Some(pb::destination::update::Update::NoEndpoints ( update: Some(pb::update::Update::NoEndpoints(
pb::destination::NoEndpoints { exists: true } pb::NoEndpoints { exists: true }
)), )),
} }
} }
fn ip_conv(ip: IpAddr) -> pb::common::IpAddress { fn ip_conv(ip: IpAddr) -> common::IpAddress {
match ip { match ip {
IpAddr::V4(v4) => pb::common::IpAddress { IpAddr::V4(v4) => common::IpAddress {
ip: Some(pb::common::ip_address::Ip::Ipv4(v4.into())), ip: Some(common::ip_address::Ip::Ipv4(v4.into())),
}, },
IpAddr::V6(v6) => { IpAddr::V6(v6) => {
let (first, last) = octets_to_u64s(v6.octets()); let (first, last) = octets_to_u64s(v6.octets());
pb::common::IpAddress { common::IpAddress {
ip: Some(pb::common::ip_address::Ip::Ipv6(pb::common::IPv6 { ip: Some(common::ip_address::Ip::Ipv6(common::IPv6 {
first, first,
last, last,
})), })),

View File

@ -19,24 +19,25 @@ extern crate tokio_core;
pub extern crate tokio_io; pub extern crate tokio_io;
extern crate tower; extern crate tower;
extern crate tower_h2; extern crate tower_h2;
extern crate tower_grpc;
extern crate log; extern crate log;
pub extern crate env_logger; pub extern crate env_logger;
use std::net::SocketAddr; pub use std::collections::HashMap;
pub use std::net::SocketAddr;
pub use std::time::Duration; pub use std::time::Duration;
use self::bytes::{BigEndian, BytesMut};
pub use self::bytes::Bytes; pub use self::bytes::Bytes;
pub use self::conduit_proxy::*; pub use self::conduit_proxy::*;
pub use self::futures::*; pub use self::futures::*;
use self::futures::sync::oneshot; use self::futures::sync::oneshot;
pub use self::http::{HeaderMap, Request, Response, StatusCode}; pub use self::http::{HeaderMap, Request, Response, StatusCode};
use self::http::header::HeaderValue;
use self::tokio_connect::Connect; use self::tokio_connect::Connect;
use self::tokio_core::net::{TcpListener, TcpStream}; use self::tokio_core::net::{TcpListener, TcpStream};
use self::tokio_core::reactor::{Core, Handle}; use self::tokio_core::reactor::{Core, Handle};
use self::tower::{NewService, Service}; use self::tower::{NewService, Service};
use self::tower_h2::{Body, RecvBody}; use self::tower_h2::{Body, RecvBody};
use self::tower_grpc as grpc;
/// Environment variable for overriding the test patience. /// Environment variable for overriding the test patience.
pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS"; pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS";

View File

@ -50,7 +50,7 @@ impl Fixture {
fn outbound_with_server(srv: server::Listening) -> Self { fn outbound_with_server(srv: server::Listening) -> Self {
let ctrl = controller::new() let ctrl = controller::new()
.destination("tele.test.svc.cluster.local", srv.addr) .destination_and_close("tele.test.svc.cluster.local", srv.addr)
.run(); .run();
let proxy = proxy::new() let proxy = proxy::new()
.controller(ctrl) .controller(ctrl)
@ -400,37 +400,18 @@ fn metrics_endpoint_outbound_request_duration() {
mod outbound_dst_labels { mod outbound_dst_labels {
use super::support::*; use super::support::*;
use super::Fixture; use super::Fixture;
use controller::DstSender;
use std::collections::HashMap; fn fixture(dest: &str) -> (Fixture, SocketAddr, DstSender) {
use std::iter::FromIterator;
fn fixture<A, B>(addr_labels: A, set_labels: B) -> Fixture
where
A: IntoIterator<Item=(String, String)>,
B: IntoIterator<Item=(String, String)>,
{
fixture_with_updates(vec![(addr_labels, set_labels)])
}
fn fixture_with_updates<A, B>(updates: Vec<(A, B)>) -> Fixture
where
A: IntoIterator<Item=(String, String)>,
B: IntoIterator<Item=(String, String)>,
{
info!("running test server"); info!("running test server");
let srv = server::new() let srv = server::new()
.route("/", "hello") .route("/", "hello")
.run(); .run();
let mut ctrl = controller::new(); let addr = srv.addr;
for (addr_labels, set_labels) in updates {
ctrl = ctrl.labeled_destination( let ctrl = controller::new();
"labeled.test.svc.cluster.local", let dst_tx = ctrl.destination_tx(dest);
srv.addr,
HashMap::from_iter(addr_labels),
HashMap::from_iter(set_labels),
);
}
let proxy = proxy::new() let proxy = proxy::new()
.controller(ctrl.run()) .controller(ctrl.run())
@ -440,22 +421,26 @@ mod outbound_dst_labels {
let client = client::new( let client = client::new(
proxy.outbound, proxy.outbound,
"labeled.test.svc.cluster.local" dest,
); );
Fixture { client, metrics, proxy }
let f = Fixture { client, metrics, proxy };
(f, addr, dst_tx)
} }
#[test] #[test]
fn multiple_addr_labels() { fn multiple_addr_labels() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let Fixture { client, metrics, proxy: _proxy } = fixture ( let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) =
vec![ fixture("labeled.test.svc.cluster.local");
(String::from("addr_label2"), String::from("bar")),
(String::from("addr_label1"), String::from("foo")), {
], let mut labels = HashMap::new();
Vec::new(), labels.insert("addr_label1".to_owned(), "foo".to_owned());
); labels.insert("addr_label2".to_owned(), "bar".to_owned());
dst_tx.send_labeled(addr, labels, HashMap::new());
}
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
@ -471,13 +456,16 @@ mod outbound_dst_labels {
#[test] #[test]
fn multiple_addrset_labels() { fn multiple_addrset_labels() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let Fixture { client, metrics, proxy: _proxy } = fixture ( let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) =
Vec::new(), fixture("labeled.test.svc.cluster.local");
vec![
(String::from("set_label1"), String::from("foo")), {
(String::from("set_label2"), String::from("bar")), let mut labels = HashMap::new();
] labels.insert("set_label1".to_owned(), "foo".to_owned());
); labels.insert("set_label2".to_owned(), "bar".to_owned());
dst_tx.send_labeled(addr, HashMap::new(), labels);
}
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
@ -493,10 +481,16 @@ mod outbound_dst_labels {
#[test] #[test]
fn labeled_addr_and_addrset() { fn labeled_addr_and_addrset() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let Fixture { client, metrics, proxy: _proxy } = fixture( let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) =
vec![(String::from("addr_label"), String::from("foo"))], fixture("labeled.test.svc.cluster.local");
vec![(String::from("set_label"), String::from("bar"))],
); {
let mut alabels = HashMap::new();
alabels.insert("addr_label".to_owned(), "foo".to_owned());
let mut slabels = HashMap::new();
slabels.insert("set_label".to_owned(), "bar".to_owned());
dst_tx.send_labeled(addr, alabels, slabels);
}
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
@ -519,19 +513,17 @@ mod outbound_dst_labels {
fn controller_updates_addr_labels() { fn controller_updates_addr_labels() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
info!("running test server"); info!("running test server");
let Fixture { client, metrics, proxy: _proxy } =
// the controller will update the value of `addr_label`. the value let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) =
// of `set_label` will remain unchanged throughout the test. fixture("labeled.test.svc.cluster.local");
fixture_with_updates(vec![
( {
vec![(String::from("addr_label"), String::from("foo"))], let mut alabels = HashMap::new();
vec![(String::from("set_label"), String::from("unchanged"))] alabels.insert("addr_label".to_owned(), "foo".to_owned());
), let mut slabels = HashMap::new();
( slabels.insert("set_label".to_owned(), "unchanged".to_owned());
vec![(String::from("addr_label"), String::from("bar"))], dst_tx.send_labeled(addr, alabels, slabels);
vec![(String::from("set_label"), String::from("unchanged"))] }
),
]);
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
@ -545,6 +537,14 @@ mod outbound_dst_labels {
assert_contains!(metrics.get("/metrics"), assert_contains!(metrics.get("/metrics"),
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
{
let mut alabels = HashMap::new();
alabels.insert("addr_label".to_owned(), "bar".to_owned());
let mut slabels = HashMap::new();
slabels.insert("set_label".to_owned(), "unchanged".to_owned());
dst_tx.send_labeled(addr, alabels, slabels);
}
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
// the second request should increment stats labeled with `dst_addr_label="bar"` // the second request should increment stats labeled with `dst_addr_label="bar"`
@ -576,11 +576,15 @@ mod outbound_dst_labels {
fn controller_updates_set_labels() { fn controller_updates_set_labels() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
info!("running test server"); info!("running test server");
let Fixture { client, metrics, proxy: _proxy } = let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) =
fixture_with_updates(vec![ fixture("labeled.test.svc.cluster.local");
(vec![], vec![(String::from("set_label"), String::from("foo"))]),
(vec![], vec![(String::from("set_label"), String::from("bar"))]), {
]); let alabels = HashMap::new();
let mut slabels = HashMap::new();
slabels.insert("set_label".to_owned(), "foo".to_owned());
dst_tx.send_labeled(addr, alabels, slabels);
}
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
@ -594,6 +598,13 @@ mod outbound_dst_labels {
assert_contains!(metrics.get("/metrics"), assert_contains!(metrics.get("/metrics"),
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1"); "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1");
{
let alabels = HashMap::new();
let mut slabels = HashMap::new();
slabels.insert("set_label".to_owned(), "bar".to_owned());
dst_tx.send_labeled(addr, alabels, slabels);
}
info!("client.get(/)"); info!("client.get(/)");
assert_eq!(client.get("/"), "hello"); assert_eq!(client.get("/"), "hello");
// the second request should increment stats labeled with `dst_addr_label="bar"` // the second request should increment stats labeled with `dst_addr_label="bar"`
@ -627,7 +638,7 @@ fn metrics_have_no_double_commas() {
let outbound_srv = server::new().route("/hey", "hello").run(); let outbound_srv = server::new().route("/hey", "hello").run();
let ctrl = controller::new() let ctrl = controller::new()
.destination("tele.test.svc.cluster.local", outbound_srv.addr) .destination_and_close("tele.test.svc.cluster.local", outbound_srv.addr)
.run(); .run();
let proxy = proxy::new() let proxy = proxy::new()
.controller(ctrl) .controller(ctrl)

View File

@ -8,7 +8,7 @@ fn outbound_http1() {
let srv = server::http1().route("/", "hello h1").run(); let srv = server::http1().route("/", "hello h1").run();
let ctrl = controller::new() let ctrl = controller::new()
.destination("transparency.test.svc.cluster.local", srv.addr) .destination_and_close("transparency.test.svc.cluster.local", srv.addr)
.run(); .run();
let proxy = proxy::new().controller(ctrl).outbound(srv).run(); let proxy = proxy::new().controller(ctrl).outbound(srv).run();
let client = client::http1(proxy.outbound, "transparency.test.svc.cluster.local"); let client = client::http1(proxy.outbound, "transparency.test.svc.cluster.local");