diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 74bf595e2..220b47453 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -2,6 +2,7 @@ use support::*; +use std::collections::VecDeque; use std::fmt; use std::net::IpAddr; use std::sync::{Arc, Mutex}; @@ -19,7 +20,7 @@ struct Destination(Box Option + Send>); #[derive(Debug)] pub struct Controller { - destinations: Vec<(String, Destination)>, + destinations: VecDeque<(String, Destination)>, reports: Option>, } @@ -32,7 +33,7 @@ pub struct Listening { impl Controller { pub fn new() -> Self { Controller { - destinations: Vec::new(), + destinations: VecDeque::new(), reports: None, } } @@ -46,7 +47,7 @@ impl Controller { F: Fn() -> Option + Send + 'static, { self.destinations - .push((dest.into(), Destination(Box::new(f)))); + .push_back((dest.into(), Destination(Box::new(f)))); self } @@ -67,7 +68,7 @@ impl Controller { } type Response = self::http::Response; -type Destinations = Arc>>; +type Destinations = Arc>>; const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get"; const TELEMETRY_REPORT: &str = "/conduit.proxy.telemetry.Telemetry/Report"; @@ -98,15 +99,18 @@ impl Svc { let destinations = self.destinations.clone(); Box::new(body.concat2().and_then(move |_bytes| { let update = { - let mut vec = destinations.lock().unwrap(); - //TODO: decode `_bytes` and compare with `.0` - if !vec.is_empty() { - let Destination(f) = vec.remove(0).1; - f() - } else { - None - } - }.unwrap_or_default(); + let next = { + let mut queue = destinations.lock().unwrap(); + queue.pop_front() + }; + // The test cases's entry may evaluate to `None` when it wants to close + // the connection. If there is no entry then that's equivalent to an + // implicit `destination_close()`. + // + // TODO: decode `_bytes` and compare with `_name` + next.and_then(|(_name, Destination(f))| f()) + .unwrap_or_default() + }; let len = update.encoded_len(); let mut buf = BytesMut::with_capacity(len + 5); buf.put(0u8);