From 51e230017e031c75929c6f4c9cbe364c692a02ee Mon Sep 17 00:00:00 2001 From: Brian Smith Date: Tue, 5 Jun 2018 08:58:18 -1000 Subject: [PATCH] Stop cloning & hashing HTTP requests & related types. (#1058) During code review of another change I noticed that a lot of types seem to derive `Hash` (and `Eq`, `PartialEq`) even though the types should never (for performance reasons) be used as keys of a hash table, and where it is kind of questionable what equality should mean for those types. Then I noticed that similarly many types implement `Clone` even though I expect we should never be cloning them, again because of our performance goals. Because these types derive these traits, then whenever we add a field to them, that field also has to implement these traits. That means we then have to expand the problem, deriving implementations of these traits for types that don't otherwise want/need to implement these traits. This makes review complicated, because, for example, we have to decide whether something should be compared case-insensitively or case-sensitively when really we don't want to compare those things at all. To prove that we can get by by doing less, to speed up code review (particularly related to some stuff related to TLS), stop deriving `Clone`, `Eq`, `PartialEq`, and `Hash` for these types. I believe that, in particular, the change to key the Tap hash table based on request ID, instead of the whole request, should speed up the tap feature since we don't hash and/or compare every field, recursively, of requests. Later more such cleanup of this sort should be done. Signed-off-by: Brian Smith --- proxy/benches/record.rs | 8 ++-- proxy/controller-grpc/src/lib.rs | 12 +----- proxy/src/control/destination/endpoint.rs | 2 +- proxy/src/control/destination/mod.rs | 12 +++--- .../src/control/fully_qualified_authority.rs | 2 +- proxy/src/control/observe.rs | 40 +++++++++++++++---- proxy/src/control/pb.rs | 10 ++--- proxy/src/ctx/http.rs | 23 +++++++++-- proxy/src/ctx/mod.rs | 6 +-- proxy/src/ctx/transport.rs | 6 +-- proxy/src/telemetry/metrics/histogram.rs | 2 +- proxy/src/telemetry/metrics/mod.rs | 3 +- proxy/src/telemetry/metrics/record.rs | 6 ++- proxy/src/telemetry/sensor/http.rs | 2 +- 14 files changed, 83 insertions(+), 51 deletions(-) diff --git a/proxy/benches/record.rs b/proxy/benches/record.rs index 8c3e929c9..ce2db85ba 100644 --- a/proxy/benches/record.rs +++ b/proxy/benches/record.rs @@ -55,7 +55,7 @@ fn request( uri: &str, server: &Arc, client: &Arc, - id: usize + id: ctx::http::RequestId, ) -> (Arc, Arc) { let req = ctx::http::Request::new( &http::Request::get(uri).body(()).unwrap(), @@ -82,7 +82,7 @@ fn record_response_end(b: &mut Bencher) { ("pod", "klay"), ]); - let (_, rsp) = request("http://buoyant.io", &server, &client, 1); + let (_, rsp) = request("http://buoyant.io", &server, &client, ctx::http::RequestId::from(1)); let request_open_at = Instant::now(); let response_open_at = request_open_at + Duration::from_millis(100); @@ -114,7 +114,7 @@ fn record_one_conn_request(b: &mut Bencher) { ("pod", "klay"), ]); - let (req, rsp) = request("http://buoyant.io", &server, &client, 1); + let (req, rsp) = request("http://buoyant.io", &server, &client, ctx::http::RequestId::from(1)); let server_transport = Arc::new(ctx::transport::Ctx::Server(server)); let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); @@ -191,7 +191,7 @@ fn record_many_dsts(b: &mut Bencher) { ("pod".into(), format!("pod{}", n)), ]); let uri = format!("http://test{}.local", n); - let (req, rsp) = request(&uri, &server, &client, 1); + let (req, rsp) = request(&uri, &server, &client, ctx::http::RequestId::from(1)); let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); events.push(TransportOpen(client_transport.clone())); diff --git a/proxy/controller-grpc/src/lib.rs b/proxy/controller-grpc/src/lib.rs index a93716106..4833654a6 100644 --- a/proxy/controller-grpc/src/lib.rs +++ b/proxy/controller-grpc/src/lib.rs @@ -10,7 +10,7 @@ extern crate quickcheck; extern crate tower_grpc; use convert::{TryFrom, TryInto}; -use std::{fmt, hash}; +use std::fmt; use std::error::Error; #[cfg(feature = "arbitrary")] @@ -191,16 +191,6 @@ impl<'a> From<&'a ::std::net::SocketAddr> for common::TcpAddress { } } -// ===== impl common::Protocol ===== - -impl hash::Hash for common::Protocol { - // it's necessary to implement Hash for Protocol as it's a field on - // ctx::Transport, which derives Hash. - fn hash(&self, state: &mut H) { - (*self as i32).hash(state) - } -} - // ===== impl common::scheme::Type ===== impl<'a> TryInto for &'a common::scheme::Type { diff --git a/proxy/src/control/destination/endpoint.rs b/proxy/src/control/destination/endpoint.rs index 1b9e746a0..112cac706 100644 --- a/proxy/src/control/destination/endpoint.rs +++ b/proxy/src/control/destination/endpoint.rs @@ -6,7 +6,7 @@ use super::{Metadata, TlsIdentity}; /// An individual traffic target. /// /// Equality, Ordering, and hashability is determined soley by the Endpoint's address. -#[derive(Clone, Debug, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Endpoint { address: SocketAddr, metadata: Metadata, diff --git a/proxy/src/control/destination/mod.rs b/proxy/src/control/destination/mod.rs index 366766892..bb4053075 100644 --- a/proxy/src/control/destination/mod.rs +++ b/proxy/src/control/destination/mod.rs @@ -87,7 +87,7 @@ pub struct Resolution { } /// Metadata describing an endpoint. -#[derive(Clone, Debug, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Metadata { /// A set of Prometheus metric labels describing the destination. dst_labels: Option, @@ -98,12 +98,10 @@ pub struct Metadata { /// How to verify TLS for an endpoint. /// -/// NOTE: This currently derives `Hash`, `PartialEq`, and `Eq`, which is not -/// entirely correct, as domain name equality ought to be case -/// insensitive. However, `Metadata` must be `Hash` + `Eq`, so this is at -/// least better than having `Metadata` ignore the TLS identity when -/// checking for equality -#[derive(Debug, Hash, PartialEq, Eq)] +/// XXX: This currently derives `PartialEq and `Eq`, which is not entirely +/// correct, as domain name equality ought to be case insensitive. However, +/// `Metadata` must be `Eq`. +#[derive(Debug, PartialEq, Eq)] pub enum TlsIdentity { K8sPodNamespace { controller_ns: String, diff --git a/proxy/src/control/fully_qualified_authority.rs b/proxy/src/control/fully_qualified_authority.rs index 52bd85030..9e767932d 100644 --- a/proxy/src/control/fully_qualified_authority.rs +++ b/proxy/src/control/fully_qualified_authority.rs @@ -3,7 +3,7 @@ use bytes::{BytesMut}; use transport::DnsNameAndPort; /// A normalized `Authority`. -#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct FullyQualifiedAuthority(String); impl FullyQualifiedAuthority { diff --git a/proxy/src/control/observe.rs b/proxy/src/control/observe.rs index 47437259b..0e7b00d50 100644 --- a/proxy/src/control/observe.rs +++ b/proxy/src/control/observe.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use futures::{future, Poll, Stream}; use futures_mpsc_lossy; use http::HeaderMap; -use indexmap::IndexMap; +use indexmap::{Equivalent, IndexSet}; use tower_grpc::{self as grpc, Response}; use conduit_proxy_controller_grpc::common::TapEvent; @@ -13,6 +13,7 @@ use convert::*; use ctx; use telemetry::Event; use telemetry::tap::{Tap, Taps}; +use std::hash::{Hash, Hasher}; #[derive(Clone, Debug)] pub struct Observe { @@ -24,11 +25,15 @@ pub struct Observe { pub struct TapEvents { rx: futures_mpsc_lossy::Receiver, remaining: usize, - current: IndexMap, ()>, + current: IndexSet, tap_id: usize, taps: Arc>, } +// `IndexSet` is equivalent to `IndexMap` but +// avoids storing the `RequestID` twice. +struct RequestById(Arc); + impl Observe { pub fn new(tap_capacity: usize) -> (Arc>, Observe) { let taps = Arc::new(Mutex::new(Taps::default())); @@ -85,7 +90,7 @@ impl server::Tap for Observe { let events = TapEvents { rx, tap_id, - current: IndexMap::default(), + current: IndexSet::default(), remaining: req.limit as usize, taps: self.taps.clone(), }; @@ -115,21 +120,21 @@ impl Stream for TapEvents { continue; } self.remaining -= 1; - let _ = self.current.insert(req.clone(), ()); + let _ = self.current.insert(RequestById(req.clone())); } Event::StreamRequestFail(ref req, _) => { - if self.current.remove(req).is_none() { + if !self.current.remove(&req.id) { continue; } } Event::StreamResponseOpen(ref rsp, _) => { - if !self.current.contains_key(&rsp.request) { + if !self.current.contains(&rsp.request.id) { continue; } } Event::StreamResponseFail(ref rsp, _) | Event::StreamResponseEnd(ref rsp, _) => { - if self.current.remove(&rsp.request).is_none() { + if !self.current.remove(&rsp.request.id) { continue; } } @@ -156,3 +161,24 @@ impl Drop for TapEvents { } } } + +impl Eq for RequestById {} + +impl PartialEq for RequestById { + fn eq(&self, other: &RequestById) -> bool { + self.0.id.eq(&other.0.id) + } +} + +impl Hash for RequestById { + fn hash(&self, state: &mut H) { + self.0.id.hash(state) + } +} + +impl Equivalent for ctx::http::RequestId { + /// Compare self to `key` and return `true` if they are equal. + fn equivalent(&self, key: &RequestById) -> bool { + *self == key.0.id + } +} diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index bd8f891d0..ca694c4f4 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -37,7 +37,7 @@ impl event::StreamResponseEnd { let end = tap_event::http::ResponseEnd { id: Some(tap_event::http::StreamId { base: 0, // TODO FIXME - stream: ctx.id as u64, + stream: ctx.id.into(), }), since_request_init: Some(pb_elapsed(self.request_open_at, self.response_end_at)), since_response_init: Some(pb_elapsed(self.response_open_at, self.response_end_at)), @@ -68,7 +68,7 @@ impl event::StreamResponseFail { let end = tap_event::http::ResponseEnd { id: Some(tap_event::http::StreamId { base: 0, // TODO FIXME - stream: ctx.id as u64, + stream: ctx.id.into(), }), since_request_init: Some(pb_elapsed(self.request_open_at, self.response_fail_at)), since_response_init: Some(pb_elapsed(self.response_open_at, self.response_fail_at)), @@ -99,7 +99,7 @@ impl event::StreamRequestFail { let end = tap_event::http::ResponseEnd { id: Some(tap_event::http::StreamId { base: 0, // TODO FIXME - stream: ctx.id as u64, + stream: ctx.id.into(), }), since_request_init: Some(pb_elapsed(self.request_open_at, self.request_fail_at)), since_response_init: None, @@ -134,7 +134,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { id: Some(tap_event::http::StreamId { base: 0, // TODO FIXME - stream: ctx.id as u64, + stream: ctx.id.into(), }), method: Some((&ctx.method).into()), scheme: ctx.uri.scheme_part().map(common::Scheme::from), @@ -166,7 +166,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { id: Some(tap_event::http::StreamId { base: 0, // TODO FIXME - stream: ctx.request.id as u64, + stream: ctx.request.id.into(), }), since_request_init: Some(pb_elapsed(rsp.request_open_at, rsp.response_open_at)), http_status: u32::from(ctx.status.as_u16()), diff --git a/proxy/src/ctx/http.rs b/proxy/src/ctx/http.rs index 8d8c7ae28..fe2495ac4 100644 --- a/proxy/src/ctx/http.rs +++ b/proxy/src/ctx/http.rs @@ -5,11 +5,14 @@ use ctx; use control::destination; use telemetry::metrics::DstLabels; +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub struct RequestId(usize); + /// Describes a stream's request headers. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] pub struct Request { // A numeric ID useful for debugging & correlation. - pub id: usize, + pub id: RequestId, pub uri: http::Uri, pub method: http::Method, @@ -22,7 +25,7 @@ pub struct Request { } /// Describes a stream's response headers. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] pub struct Response { pub request: Arc, @@ -36,12 +39,24 @@ pub struct Response { // pub h2_error_code: Option, //} +impl From for RequestId { + fn from(value: usize) -> Self { + RequestId(value) + } +} + +impl Into for RequestId { + fn into(self) -> u64 { + self.0 as u64 + } +} + impl Request { pub fn new( request: &http::Request, server: &Arc, client: &Arc, - id: usize, + id: RequestId, ) -> Arc { let r = Self { id, diff --git a/proxy/src/ctx/mod.rs b/proxy/src/ctx/mod.rs index d663518d3..66646df6f 100644 --- a/proxy/src/ctx/mod.rs +++ b/proxy/src/ctx/mod.rs @@ -14,7 +14,7 @@ pub mod http; pub mod transport; /// Describes a single running proxy instance. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Process { /// Identifies the Kubernetes namespace in which this proxy is process. pub scheduled_namespace: String, @@ -29,7 +29,7 @@ pub struct Process { /// local instance. /// - The _outbound_ proxy receives traffic from the local instance and forwards it to a /// remove service. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Proxy { Inbound(Arc), Outbound(Arc), @@ -117,7 +117,7 @@ pub mod test_util { uri: &str, server: &Arc, client: &Arc, - id: usize + id: ctx::http::RequestId, ) -> (Arc, Arc) { let req = ctx::http::Request::new( &http::Request::get(uri).body(()).unwrap(), diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 0333c7cea..a080061be 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -5,14 +5,14 @@ use ctx; use control::destination; use telemetry::metrics::DstLabels; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] pub enum Ctx { Client(Arc), Server(Arc), } /// Identifies a connection from another process to a proxy listener. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] pub struct Server { pub proxy: Arc, pub remote: SocketAddr, @@ -21,7 +21,7 @@ pub struct Server { } /// Identifies a connection from the proxy to another process. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] pub struct Client { pub proxy: Arc, pub remote: SocketAddr, diff --git a/proxy/src/telemetry/metrics/histogram.rs b/proxy/src/telemetry/metrics/histogram.rs index 55f214c37..f14ae5356 100644 --- a/proxy/src/telemetry/metrics/histogram.rs +++ b/proxy/src/telemetry/metrics/histogram.rs @@ -35,7 +35,7 @@ pub struct Histogram> { _p: PhantomData, } -#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)] +#[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum Bucket { Le(u64), Inf, diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 20ef2026e..ffdebdbfd 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -286,7 +286,8 @@ mod tests { team: &str ) { let client = client(&proxy, vec![("team", team)]); - let (req, rsp) = request("http://nba.com", &server, &client, 1); + let (req, rsp) = request("http://nba.com", &server, &client, + ctx::http::RequestId::from(1)); let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); let transport = TransportLabels::new(&client_transport); diff --git a/proxy/src/telemetry/metrics/record.rs b/proxy/src/telemetry/metrics/record.rs index a8301c3a9..ad6e4bf6d 100644 --- a/proxy/src/telemetry/metrics/record.rs +++ b/proxy/src/telemetry/metrics/record.rs @@ -108,7 +108,8 @@ mod test { ("pod", "klay"), ]); - let (_, rsp) = request("http://buoyant.io", &server, &client, 1); + let (_, rsp) = request("http://buoyant.io", &server, &client, + ctx::http::RequestId::from(1)); let request_open_at = Instant::now(); let response_open_at = request_open_at + Duration::from_millis(100); @@ -168,7 +169,8 @@ mod test { ("pod", "klay"), ]); - let (req, rsp) = request("http://buoyant.io", &server, &client, 1); + let (req, rsp) = request("http://buoyant.io", &server, &client, + ctx::http::RequestId::from(1)); let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone())); let client_transport = diff --git a/proxy/src/telemetry/sensor/http.rs b/proxy/src/telemetry/sensor/http.rs index c9177938b..d83a61e12 100644 --- a/proxy/src/telemetry/sensor/http.rs +++ b/proxy/src/telemetry/sensor/http.rs @@ -228,7 +228,7 @@ where ); let (inner, body_inner) = match metadata { (Some(ctx), Some(RequestOpen(request_open_at))) => { - let id = self.next_id.fetch_add(1, Ordering::SeqCst); + let id = ctx::http::RequestId::from(self.next_id.fetch_add(1, Ordering::SeqCst)); let ctx = ctx::http::Request::new(&req, &ctx, &self.client_ctx, id); self.handle