mirror of https://github.com/linkerd/linkerd2.git
Proxy: Clarify destination test support code queue handling (#617)
Use `VecDeqeue` to make the queue structure clear. Follow good practice by minimizing the amount of time the lock is held. Clarify how defaulting logic works. Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
006360aa90
commit
7247ffeee3
|
@ -2,6 +2,7 @@
|
|||
|
||||
use support::*;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
@ -19,7 +20,7 @@ struct Destination(Box<Fn() -> Option<pb::destination::Update> + Send>);
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Controller {
|
||||
destinations: Vec<(String, Destination)>,
|
||||
destinations: VecDeque<(String, Destination)>,
|
||||
reports: Option<mpsc::UnboundedSender<pb::telemetry::ReportRequest>>,
|
||||
}
|
||||
|
||||
|
@ -32,7 +33,7 @@ pub struct Listening {
|
|||
impl Controller {
|
||||
pub fn new() -> Self {
|
||||
Controller {
|
||||
destinations: Vec::new(),
|
||||
destinations: VecDeque::new(),
|
||||
reports: None,
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +47,7 @@ impl Controller {
|
|||
F: Fn() -> Option<pb::destination::Update> + Send + 'static,
|
||||
{
|
||||
self.destinations
|
||||
.push((dest.into(), Destination(Box::new(f))));
|
||||
.push_back((dest.into(), Destination(Box::new(f))));
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -67,7 +68,7 @@ impl Controller {
|
|||
}
|
||||
|
||||
type Response = self::http::Response<GrpcBody>;
|
||||
type Destinations = Arc<Mutex<Vec<(String, Destination)>>>;
|
||||
type Destinations = Arc<Mutex<VecDeque<(String, Destination)>>>;
|
||||
|
||||
const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get";
|
||||
const TELEMETRY_REPORT: &str = "/conduit.proxy.telemetry.Telemetry/Report";
|
||||
|
@ -98,15 +99,18 @@ impl Svc {
|
|||
let destinations = self.destinations.clone();
|
||||
Box::new(body.concat2().and_then(move |_bytes| {
|
||||
let update = {
|
||||
let mut vec = destinations.lock().unwrap();
|
||||
//TODO: decode `_bytes` and compare with `.0`
|
||||
if !vec.is_empty() {
|
||||
let Destination(f) = vec.remove(0).1;
|
||||
f()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}.unwrap_or_default();
|
||||
let next = {
|
||||
let mut queue = destinations.lock().unwrap();
|
||||
queue.pop_front()
|
||||
};
|
||||
// The test cases's entry may evaluate to `None` when it wants to close
|
||||
// the connection. If there is no entry then that's equivalent to an
|
||||
// implicit `destination_close()`.
|
||||
//
|
||||
// TODO: decode `_bytes` and compare with `_name`
|
||||
next.and_then(|(_name, Destination(f))| f())
|
||||
.unwrap_or_default()
|
||||
};
|
||||
let len = update.encoded_len();
|
||||
let mut buf = BytesMut::with_capacity(len + 5);
|
||||
buf.put(0u8);
|
||||
|
|
Loading…
Reference in New Issue