Stop cloning & hashing HTTP requests & related types. (#1058)
During code review of another change I noticed that a lot of types seem to derive `Hash` (and `Eq`, `PartialEq`) even though the types should never (for performance reasons) be used as keys of a hash table, and where it is kind of questionable what equality should mean for those types. Then I noticed that similarly many types implement `Clone` even though I expect we should never be cloning them, again because of our performance goals. Because these types derive these traits, then whenever we add a field to them, that field also has to implement these traits. That means we then have to expand the problem, deriving implementations of these traits for types that don't otherwise want/need to implement these traits. This makes review complicated, because, for example, we have to decide whether something should be compared case-insensitively or case-sensitively when really we don't want to compare those things at all. To prove that we can get by by doing less, to speed up code review (particularly related to some stuff related to TLS), stop deriving `Clone`, `Eq`, `PartialEq`, and `Hash` for these types. I believe that, in particular, the change to key the Tap hash table based on request ID, instead of the whole request, should speed up the tap feature since we don't hash and/or compare every field, recursively, of requests. Later more such cleanup of this sort should be done. Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
73163ad019
commit
f9cc52e1c0
|
@ -55,7 +55,7 @@ fn request(
|
|||
uri: &str,
|
||||
server: &Arc<ctx::transport::Server>,
|
||||
client: &Arc<ctx::transport::Client>,
|
||||
id: usize
|
||||
id: ctx::http::RequestId,
|
||||
) -> (Arc<ctx::http::Request>, Arc<ctx::http::Response>) {
|
||||
let req = ctx::http::Request::new(
|
||||
&http::Request::get(uri).body(()).unwrap(),
|
||||
|
@ -82,7 +82,7 @@ fn record_response_end(b: &mut Bencher) {
|
|||
("pod", "klay"),
|
||||
]);
|
||||
|
||||
let (_, rsp) = request("http://buoyant.io", &server, &client, 1);
|
||||
let (_, rsp) = request("http://buoyant.io", &server, &client, ctx::http::RequestId::from(1));
|
||||
|
||||
let request_open_at = Instant::now();
|
||||
let response_open_at = request_open_at + Duration::from_millis(100);
|
||||
|
@ -114,7 +114,7 @@ fn record_one_conn_request(b: &mut Bencher) {
|
|||
("pod", "klay"),
|
||||
]);
|
||||
|
||||
let (req, rsp) = request("http://buoyant.io", &server, &client, 1);
|
||||
let (req, rsp) = request("http://buoyant.io", &server, &client, ctx::http::RequestId::from(1));
|
||||
|
||||
let server_transport = Arc::new(ctx::transport::Ctx::Server(server));
|
||||
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
|
||||
|
@ -191,7 +191,7 @@ fn record_many_dsts(b: &mut Bencher) {
|
|||
("pod".into(), format!("pod{}", n)),
|
||||
]);
|
||||
let uri = format!("http://test{}.local", n);
|
||||
let (req, rsp) = request(&uri, &server, &client, 1);
|
||||
let (req, rsp) = request(&uri, &server, &client, ctx::http::RequestId::from(1));
|
||||
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
|
||||
|
||||
events.push(TransportOpen(client_transport.clone()));
|
||||
|
|
|
@ -6,7 +6,7 @@ use super::{Metadata, TlsIdentity};
|
|||
/// An individual traffic target.
|
||||
///
|
||||
/// Equality, Ordering, and hashability is determined soley by the Endpoint's address.
|
||||
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct Endpoint {
|
||||
address: SocketAddr,
|
||||
metadata: Metadata,
|
||||
|
|
|
@ -87,7 +87,7 @@ pub struct Resolution<B> {
|
|||
}
|
||||
|
||||
/// Metadata describing an endpoint.
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct Metadata {
|
||||
/// A set of Prometheus metric labels describing the destination.
|
||||
dst_labels: Option<DstLabels>,
|
||||
|
@ -98,12 +98,10 @@ pub struct Metadata {
|
|||
|
||||
/// How to verify TLS for an endpoint.
|
||||
///
|
||||
/// NOTE: This currently derives `Hash`, `PartialEq`, and `Eq`, which is not
|
||||
/// entirely correct, as domain name equality ought to be case
|
||||
/// insensitive. However, `Metadata` must be `Hash` + `Eq`, so this is at
|
||||
/// least better than having `Metadata` ignore the TLS identity when
|
||||
/// checking for equality
|
||||
#[derive(Debug, Hash, PartialEq, Eq)]
|
||||
/// XXX: This currently derives `PartialEq and `Eq`, which is not entirely
|
||||
/// correct, as domain name equality ought to be case insensitive. However,
|
||||
/// `Metadata` must be `Eq`.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum TlsIdentity {
|
||||
K8sPodNamespace {
|
||||
controller_ns: String,
|
||||
|
|
|
@ -3,7 +3,7 @@ use bytes::{BytesMut};
|
|||
use transport::DnsNameAndPort;
|
||||
|
||||
/// A normalized `Authority`.
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct FullyQualifiedAuthority(String);
|
||||
|
||||
impl FullyQualifiedAuthority {
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|||
use futures::{future, Poll, Stream};
|
||||
use futures_mpsc_lossy;
|
||||
use http::HeaderMap;
|
||||
use indexmap::IndexMap;
|
||||
use indexmap::{Equivalent, IndexSet};
|
||||
use tower_grpc::{self as grpc, Response};
|
||||
|
||||
use conduit_proxy_controller_grpc::common::TapEvent;
|
||||
|
@ -13,6 +13,7 @@ use convert::*;
|
|||
use ctx;
|
||||
use telemetry::Event;
|
||||
use telemetry::tap::{Tap, Taps};
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Observe {
|
||||
|
@ -24,11 +25,15 @@ pub struct Observe {
|
|||
pub struct TapEvents {
|
||||
rx: futures_mpsc_lossy::Receiver<Event>,
|
||||
remaining: usize,
|
||||
current: IndexMap<Arc<ctx::http::Request>, ()>,
|
||||
current: IndexSet<RequestById>,
|
||||
tap_id: usize,
|
||||
taps: Arc<Mutex<Taps>>,
|
||||
}
|
||||
|
||||
// `IndexSet<RequestById>` is equivalent to `IndexMap<RequestId, Request>` but
|
||||
// avoids storing the `RequestID` twice.
|
||||
struct RequestById(Arc<ctx::http::Request>);
|
||||
|
||||
impl Observe {
|
||||
pub fn new(tap_capacity: usize) -> (Arc<Mutex<Taps>>, Observe) {
|
||||
let taps = Arc::new(Mutex::new(Taps::default()));
|
||||
|
@ -85,7 +90,7 @@ impl server::Tap for Observe {
|
|||
let events = TapEvents {
|
||||
rx,
|
||||
tap_id,
|
||||
current: IndexMap::default(),
|
||||
current: IndexSet::default(),
|
||||
remaining: req.limit as usize,
|
||||
taps: self.taps.clone(),
|
||||
};
|
||||
|
@ -115,21 +120,21 @@ impl Stream for TapEvents {
|
|||
continue;
|
||||
}
|
||||
self.remaining -= 1;
|
||||
let _ = self.current.insert(req.clone(), ());
|
||||
let _ = self.current.insert(RequestById(req.clone()));
|
||||
}
|
||||
Event::StreamRequestFail(ref req, _) => {
|
||||
if self.current.remove(req).is_none() {
|
||||
if !self.current.remove(&req.id) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Event::StreamResponseOpen(ref rsp, _) => {
|
||||
if !self.current.contains_key(&rsp.request) {
|
||||
if !self.current.contains(&rsp.request.id) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Event::StreamResponseFail(ref rsp, _) |
|
||||
Event::StreamResponseEnd(ref rsp, _) => {
|
||||
if self.current.remove(&rsp.request).is_none() {
|
||||
if !self.current.remove(&rsp.request.id) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -156,3 +161,24 @@ impl Drop for TapEvents {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for RequestById {}
|
||||
|
||||
impl PartialEq for RequestById {
|
||||
fn eq(&self, other: &RequestById) -> bool {
|
||||
self.0.id.eq(&other.0.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Hash for RequestById {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.0.id.hash(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl Equivalent<RequestById> for ctx::http::RequestId {
|
||||
/// Compare self to `key` and return `true` if they are equal.
|
||||
fn equivalent(&self, key: &RequestById) -> bool {
|
||||
*self == key.0.id
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ impl event::StreamResponseEnd {
|
|||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
stream: ctx.id.into(),
|
||||
}),
|
||||
since_request_init: Some(pb_elapsed(self.request_open_at, self.response_end_at)),
|
||||
since_response_init: Some(pb_elapsed(self.response_open_at, self.response_end_at)),
|
||||
|
@ -68,7 +68,7 @@ impl event::StreamResponseFail {
|
|||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
stream: ctx.id.into(),
|
||||
}),
|
||||
since_request_init: Some(pb_elapsed(self.request_open_at, self.response_fail_at)),
|
||||
since_response_init: Some(pb_elapsed(self.response_open_at, self.response_fail_at)),
|
||||
|
@ -99,7 +99,7 @@ impl event::StreamRequestFail {
|
|||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
stream: ctx.id.into(),
|
||||
}),
|
||||
since_request_init: Some(pb_elapsed(self.request_open_at, self.request_fail_at)),
|
||||
since_response_init: None,
|
||||
|
@ -134,7 +134,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
|
|||
id: Some(tap_event::http::StreamId {
|
||||
base: 0,
|
||||
// TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
stream: ctx.id.into(),
|
||||
}),
|
||||
method: Some((&ctx.method).into()),
|
||||
scheme: ctx.uri.scheme_part().map(common::Scheme::from),
|
||||
|
@ -166,7 +166,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
|
|||
id: Some(tap_event::http::StreamId {
|
||||
base: 0,
|
||||
// TODO FIXME
|
||||
stream: ctx.request.id as u64,
|
||||
stream: ctx.request.id.into(),
|
||||
}),
|
||||
since_request_init: Some(pb_elapsed(rsp.request_open_at, rsp.response_open_at)),
|
||||
http_status: u32::from(ctx.status.as_u16()),
|
||||
|
|
|
@ -5,11 +5,14 @@ use ctx;
|
|||
use control::destination;
|
||||
use telemetry::metrics::DstLabels;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct RequestId(usize);
|
||||
|
||||
/// Describes a stream's request headers.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug)]
|
||||
pub struct Request {
|
||||
// A numeric ID useful for debugging & correlation.
|
||||
pub id: usize,
|
||||
pub id: RequestId,
|
||||
|
||||
pub uri: http::Uri,
|
||||
pub method: http::Method,
|
||||
|
@ -22,7 +25,7 @@ pub struct Request {
|
|||
}
|
||||
|
||||
/// Describes a stream's response headers.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug)]
|
||||
pub struct Response {
|
||||
pub request: Arc<Request>,
|
||||
|
||||
|
@ -36,12 +39,24 @@ pub struct Response {
|
|||
// pub h2_error_code: Option<u32>,
|
||||
//}
|
||||
|
||||
impl From<usize> for RequestId {
|
||||
fn from(value: usize) -> Self {
|
||||
RequestId(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<u64> for RequestId {
|
||||
fn into(self) -> u64 {
|
||||
self.0 as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl Request {
|
||||
pub fn new<B>(
|
||||
request: &http::Request<B>,
|
||||
server: &Arc<ctx::transport::Server>,
|
||||
client: &Arc<ctx::transport::Client>,
|
||||
id: usize,
|
||||
id: RequestId,
|
||||
) -> Arc<Self> {
|
||||
let r = Self {
|
||||
id,
|
||||
|
|
|
@ -14,7 +14,7 @@ pub mod http;
|
|||
pub mod transport;
|
||||
|
||||
/// Describes a single running proxy instance.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct Process {
|
||||
/// Identifies the Kubernetes namespace in which this proxy is process.
|
||||
pub scheduled_namespace: String,
|
||||
|
@ -29,7 +29,7 @@ pub struct Process {
|
|||
/// local instance.
|
||||
/// - The _outbound_ proxy receives traffic from the local instance and forwards it to a
|
||||
/// remove service.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum Proxy {
|
||||
Inbound(Arc<Process>),
|
||||
Outbound(Arc<Process>),
|
||||
|
@ -117,7 +117,7 @@ pub mod test_util {
|
|||
uri: &str,
|
||||
server: &Arc<ctx::transport::Server>,
|
||||
client: &Arc<ctx::transport::Client>,
|
||||
id: usize
|
||||
id: ctx::http::RequestId,
|
||||
) -> (Arc<ctx::http::Request>, Arc<ctx::http::Response>) {
|
||||
let req = ctx::http::Request::new(
|
||||
&http::Request::get(uri).body(()).unwrap(),
|
||||
|
|
|
@ -5,14 +5,14 @@ use ctx;
|
|||
use control::destination;
|
||||
use telemetry::metrics::DstLabels;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug)]
|
||||
pub enum Ctx {
|
||||
Client(Arc<Client>),
|
||||
Server(Arc<Server>),
|
||||
}
|
||||
|
||||
/// Identifies a connection from another process to a proxy listener.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug)]
|
||||
pub struct Server {
|
||||
pub proxy: Arc<ctx::Proxy>,
|
||||
pub remote: SocketAddr,
|
||||
|
@ -21,7 +21,7 @@ pub struct Server {
|
|||
}
|
||||
|
||||
/// Identifies a connection from the proxy to another process.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug)]
|
||||
pub struct Client {
|
||||
pub proxy: Arc<ctx::Proxy>,
|
||||
pub remote: SocketAddr,
|
||||
|
|
|
@ -35,7 +35,7 @@ pub struct Histogram<V: Into<u64>> {
|
|||
_p: PhantomData<V>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)]
|
||||
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
|
||||
pub enum Bucket {
|
||||
Le(u64),
|
||||
Inf,
|
||||
|
|
|
@ -286,7 +286,8 @@ mod tests {
|
|||
team: &str
|
||||
) {
|
||||
let client = client(&proxy, vec![("team", team)]);
|
||||
let (req, rsp) = request("http://nba.com", &server, &client, 1);
|
||||
let (req, rsp) = request("http://nba.com", &server, &client,
|
||||
ctx::http::RequestId::from(1));
|
||||
|
||||
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
|
||||
let transport = TransportLabels::new(&client_transport);
|
||||
|
|
|
@ -108,7 +108,8 @@ mod test {
|
|||
("pod", "klay"),
|
||||
]);
|
||||
|
||||
let (_, rsp) = request("http://buoyant.io", &server, &client, 1);
|
||||
let (_, rsp) = request("http://buoyant.io", &server, &client,
|
||||
ctx::http::RequestId::from(1));
|
||||
|
||||
let request_open_at = Instant::now();
|
||||
let response_open_at = request_open_at + Duration::from_millis(100);
|
||||
|
@ -168,7 +169,8 @@ mod test {
|
|||
("pod", "klay"),
|
||||
]);
|
||||
|
||||
let (req, rsp) = request("http://buoyant.io", &server, &client, 1);
|
||||
let (req, rsp) = request("http://buoyant.io", &server, &client,
|
||||
ctx::http::RequestId::from(1));
|
||||
let server_transport =
|
||||
Arc::new(ctx::transport::Ctx::Server(server.clone()));
|
||||
let client_transport =
|
||||
|
|
|
@ -228,7 +228,7 @@ where
|
|||
);
|
||||
let (inner, body_inner) = match metadata {
|
||||
(Some(ctx), Some(RequestOpen(request_open_at))) => {
|
||||
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
|
||||
let id = ctx::http::RequestId::from(self.next_id.fetch_add(1, Ordering::SeqCst));
|
||||
let ctx = ctx::http::Request::new(&req, &ctx, &self.client_ctx, id);
|
||||
|
||||
self.handle
|
||||
|
|
Loading…
Reference in New Issue