From efdfc93b50726a9f843a0ac618575cc1651c1b53 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 12 Apr 2018 17:39:29 -0700 Subject: [PATCH] Stop pushing telemetry reports from the proxy (#616) Now that the controller does not depend on pushed telemetry reports, the proxy need not depend on the telemetry API or maintain legacy sampling logic. --- proxy/controller-grpc/build.rs | 1 - proxy/controller-grpc/src/lib.rs | 4 - proxy/src/config.rs | 18 -- proxy/src/control/mod.rs | 17 +- proxy/src/control/telemetry.rs | 109 ---------- proxy/src/ctx/mod.rs | 12 -- proxy/src/lib.rs | 7 +- proxy/src/telemetry/control.rs | 159 ++------------- proxy/src/telemetry/metrics/mod.rs | 307 ----------------------------- proxy/src/telemetry/mod.rs | 12 +- proxy/tests/support/controller.rs | 25 --- proxy/tests/support/proxy.rs | 15 -- proxy/tests/telemetry.rs | 267 ------------------------- 13 files changed, 28 insertions(+), 925 deletions(-) delete mode 100644 proxy/src/control/telemetry.rs diff --git a/proxy/controller-grpc/build.rs b/proxy/controller-grpc/build.rs index d2cd83a0f..892120d28 100644 --- a/proxy/controller-grpc/build.rs +++ b/proxy/controller-grpc/build.rs @@ -8,7 +8,6 @@ fn build_control() { let client_files = &[ "../../proto/common/common.proto", "../../proto/proxy/destination/destination.proto", - "../../proto/proxy/telemetry/telemetry.proto", ]; let server_files = &["../../proto/proxy/tap/tap.proto"]; let dirs = &["../../proto"]; diff --git a/proxy/controller-grpc/src/lib.rs b/proxy/controller-grpc/src/lib.rs index 1845d0858..4114298c6 100644 --- a/proxy/controller-grpc/src/lib.rs +++ b/proxy/controller-grpc/src/lib.rs @@ -32,10 +32,6 @@ mod gen { pub mod tap { include!(concat!(env!("OUT_DIR"), "/conduit.proxy.tap.rs")); } - - pub mod telemetry { - include!(concat!(env!("OUT_DIR"), "/conduit.proxy.telemetry.rs")); - } } /// Converts a Rust Duration to a Protobuf Duration. diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 2ccdd5607..9f9fcf5ef 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -53,12 +53,6 @@ pub struct Config { /// Event queue capacity. pub event_buffer_capacity: usize, - /// Interval after which to flush metrics. - pub metrics_flush_interval: Duration, - - /// Timeout after which to cancel telemetry reports. - pub report_timeout: Duration, - /// Timeout after which to cancel binding a request. pub bind_timeout: Duration, @@ -131,8 +125,6 @@ pub struct TestEnv { // Environment variables to look at when loading the configuration const ENV_EVENT_BUFFER_CAPACITY: &str = "CONDUIT_PROXY_EVENT_BUFFER_CAPACITY"; -pub const ENV_METRICS_FLUSH_INTERVAL_SECS: &str = "CONDUIT_PROXY_METRICS_FLUSH_INTERVAL_SECS"; -const ENV_REPORT_TIMEOUT_SECS: &str = "CONDUIT_PROXY_REPORT_TIMEOUT_SECS"; pub const ENV_PRIVATE_LISTENER: &str = "CONDUIT_PROXY_PRIVATE_LISTENER"; pub const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD"; pub const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER"; @@ -156,8 +148,6 @@ const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF"; // Default values for various configuration fields const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 10_000; // FIXME -const DEFAULT_METRICS_FLUSH_INTERVAL_SECS: u64 = 10; -const DEFAULT_REPORT_TIMEOUT_SECS: u64 = 10; // TODO: is this a reasonable default? const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140"; const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190"; @@ -196,9 +186,6 @@ impl<'a> TryFrom<&'a Strings> for Config { let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_number); let resolv_conf_path = strings.get(ENV_RESOLV_CONF); let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number); - let metrics_flush_interval_secs = - parse(strings, ENV_METRICS_FLUSH_INTERVAL_SECS, parse_number); - let report_timeout = parse(strings, ENV_REPORT_TIMEOUT_SECS, parse_number); let pod_name = strings.get(ENV_POD_NAME); let pod_namespace = strings.get(ENV_POD_NAMESPACE).and_then(|maybe_value| { // There cannot be a default pod namespace, and the pod namespace is required. @@ -255,11 +242,6 @@ impl<'a> TryFrom<&'a Strings> for Config { control_host_and_port: control_host_and_port?, event_buffer_capacity: event_buffer_capacity?.unwrap_or(DEFAULT_EVENT_BUFFER_CAPACITY), - metrics_flush_interval: - Duration::from_secs(metrics_flush_interval_secs? - .unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS)), - report_timeout: - Duration::from_secs(report_timeout?.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)), bind_timeout: Duration::from_millis(bind_timeout?.unwrap_or(DEFAULT_BIND_TIMEOUT_MS)), pod_name: pod_name?, diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 5051f99c5..ab3b34d09 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -3,7 +3,7 @@ use std::io; use std::time::{Duration, Instant}; use bytes::Bytes; -use futures::{future, Async, Future, Poll, Stream}; +use futures::{future, Async, Future, Poll}; use h2; use http; use tokio_core::reactor::{ @@ -25,13 +25,10 @@ pub mod discovery; mod fully_qualified_authority; mod observe; pub mod pb; -mod telemetry; use self::discovery::{Background as DiscoBg, Discovery, Watch}; pub use self::discovery::Bind; pub use self::observe::Observe; -use conduit_proxy_controller_grpc::telemetry::ReportRequest; -use self::telemetry::Telemetry; pub struct Control { disco: Discovery, @@ -67,17 +64,12 @@ impl Control { // ===== impl Background ===== impl Background { - pub fn bind( + pub fn bind( self, - events: S, host_and_port: HostAndPort, dns_config: dns::Config, - report_timeout: Duration, executor: &Handle, - ) -> Box> - where - S: Stream + 'static, - { + ) -> Box> { // Build up the Controller Client Stack let mut client = { let ctx = ("controller-client", format!("{:?}", host_and_port)); @@ -96,7 +88,6 @@ impl Background { ::logging::context_executor(ctx, executor.clone()), ); - let reconnect = Reconnect::new(h2_client); let log_errors = LogErrors::new(reconnect); let backoff = Backoff::new(log_errors, Duration::from_secs(5), executor); @@ -105,11 +96,9 @@ impl Background { }; let mut disco = self.disco.work(executor); - let mut telemetry = Telemetry::new(events, report_timeout, executor); let fut = future::poll_fn(move || { disco.poll_rpc(&mut client); - telemetry.poll_rpc(&mut client); Ok(Async::NotReady) }); diff --git a/proxy/src/control/telemetry.rs b/proxy/src/control/telemetry.rs deleted file mode 100644 index 434b65203..000000000 --- a/proxy/src/control/telemetry.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::fmt; -use std::time::{Duration, Instant}; - -use futures::{Async, Future, Stream}; -use tokio_core::reactor::Handle; -use tower_h2::{HttpService, BoxBody}; -use tower_grpc as grpc; - -use conduit_proxy_controller_grpc::telemetry::{ReportRequest, ReportResponse}; -use conduit_proxy_controller_grpc::telemetry::client::Telemetry as TelemetrySvc; -use ::timeout::{Timeout, TimeoutFuture}; - -type TelemetryStream = grpc::client::unary::ResponseFuture< - ReportResponse, TimeoutFuture, B>; - -#[derive(Debug)] -pub struct Telemetry { - reports: T, - in_flight: Option<(Instant, TelemetryStream)>, - report_timeout: Duration, - handle: Handle, -} - -impl Telemetry -where - S: HttpService, - S::Error: fmt::Debug, - T: Stream, - T::Error: ::std::fmt::Debug, -{ - pub fn new(reports: T, report_timeout: Duration, handle: &Handle) -> Self { - Telemetry { - reports, - in_flight: None, - report_timeout, - handle: handle.clone(), - } - } - - pub fn poll_rpc(&mut self, client: &mut S) - { - let client = Timeout::new(client.lift_ref(), self.report_timeout, &self.handle); - let mut svc = TelemetrySvc::new(client); - - //let _ctxt = ::logging::context("Telemetry.Report".into()); - - loop { - if let Some((t0, mut fut)) = self.in_flight.take() { - match fut.poll() { - Ok(Async::NotReady) => { - // TODO: can we just move this logging logic to `Timeout`? - trace!("report in flight to controller for {:?}", t0.elapsed()); - self.in_flight = Some((t0, fut)) - } - Ok(Async::Ready(_)) => { - trace!("report sent to controller in {:?}", t0.elapsed()) - } - Err(err) => warn!("controller error: {:?}", err), - } - } - - let controller_ready = self.in_flight.is_none() && match svc.poll_ready() { - Ok(Async::Ready(_)) => true, - Ok(Async::NotReady) => { - trace!("controller unavailable"); - false - } - Err(err) => { - warn!("controller error: {:?}", err); - false - } - }; - - match self.reports.poll() { - Ok(Async::NotReady) => { - return; - } - Ok(Async::Ready(None)) => { - debug!("report stream complete"); - return; - } - Err(err) => { - warn!("report stream error: {:?}", err); - } - Ok(Async::Ready(Some(report))) => { - // Attempt to send the report. Continue looping so that `reports` is - // polled until it's not ready. - if !controller_ready { - info!( - "report dropped; requests={} accepts={} connects={}", - report.requests.len(), - report.server_transports.len(), - report.client_transports.len(), - ); - } else { - trace!( - "report sent; requests={} accepts={} connects={}", - report.requests.len(), - report.server_transports.len(), - report.client_transports.len(), - ); - let rep = svc.report(grpc::Request::new(report)); - self.in_flight = Some((Instant::now(), rep)); - } - } - } - } - } -} diff --git a/proxy/src/ctx/mod.rs b/proxy/src/ctx/mod.rs index 55f5fc724..ac2a0aeaa 100644 --- a/proxy/src/ctx/mod.rs +++ b/proxy/src/ctx/mod.rs @@ -8,7 +8,6 @@ //! be stored in `http::Extensions`, for instance. Furthermore, because these contexts //! will be sent to a telemetry processing thread, we want to avoid excessive cloning. use config; -use conduit_proxy_controller_grpc::telemetry as proto; use std::time::SystemTime; use std::sync::Arc; pub mod http; @@ -77,17 +76,6 @@ impl Process { } } -impl<'a> Into for &'a Process { - fn into(self) -> proto::Process { - // TODO: can this be implemented without cloning Strings? - proto::Process { - node: self.node.clone(), - scheduled_instance: self.scheduled_instance.clone(), - scheduled_namespace: self.scheduled_namespace.clone(), - } - } -} - impl Proxy { pub fn inbound(p: &Arc) -> Arc { Arc::new(Proxy::Inbound(Arc::clone(p))) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8646e547e..52fc59d81 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -201,7 +201,6 @@ where let (sensors, telemetry) = telemetry::new( &process_ctx, config.event_buffer_capacity, - config.metrics_flush_interval, ); let dns_config = dns::Config::from_file(&config.resolv_conf_path); @@ -262,7 +261,6 @@ where let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>(); { - let report_timeout = config.report_timeout; thread::Builder::new() .name("controller-client".into()) .spawn(move || { @@ -288,15 +286,14 @@ where .serve_metrics(metrics_listener); let client = control_bg.bind( - telemetry, control_host_and_port, dns_config, - report_timeout, &executor ); - let fut = client.join3( + let fut = client.join4( server.map_err(|_| {}), + telemetry, metrics_server.map_err(|_| {}), ).map(|_| {}); executor.spawn(::logging::context_future("controller-client", fut)); diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index 3e7a95aba..a9cea2d51 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -1,15 +1,13 @@ -use std::{fmt, io}; +use std::io; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; use futures::{future, Async, Future, Poll, Stream}; use futures_mpsc_lossy::Receiver; -use tokio_core::reactor::{Handle, Timeout}; +use tokio_core::reactor::Handle; use super::event::Event; -use super::metrics::{prometheus, Metrics as PushMetrics}; +use super::metrics::prometheus; use super::tap::Taps; -use conduit_proxy_controller_grpc::telemetry::ReportRequest; use connection; use ctx; @@ -19,10 +17,6 @@ pub struct MakeControl { /// Receives events. rx: Receiver, - /// Limits the amount of time metrics may be buffered before being flushed to the - /// controller. - flush_interval: Duration, - process_ctx: Arc, } @@ -37,12 +31,10 @@ pub struct MakeControl { /// /// # TODO /// Limit the amount of memory that may be consumed for metrics aggregation. +#[derive(Debug)] pub struct Control { - /// Holds the current state of aggregated metrics. - push_metrics: Option, - /// Aggregates scrapable metrics. - metrics_work: prometheus::Aggregate, + metrics_aggregate: prometheus::Aggregate, /// Serves scrapable metrics. metrics_service: prometheus::Serve, @@ -53,14 +45,6 @@ pub struct Control { /// Holds the current state of tap observations, as configured by an external source. taps: Option>>, - /// Limits the amount of time metrics may be buffered before being flushed to the - /// controller. - flush_interval: Duration, - - /// Ensures liveliness of telemetry by waking the stream to produce reports when - /// needed. This timeout is reset as reports are returned. - flush_timeout: Timeout, - handle: Handle, } @@ -71,16 +55,13 @@ impl MakeControl { /// /// # Arguments /// - `rx`: the `Receiver` side of the channel on which events are sent. - /// - `flush_interval`: the maximum amount of time between sending reports to the - /// controller. + /// - `process_ctx`: runtime process metadata. pub(super) fn new( rx: Receiver, - flush_interval: Duration, process_ctx: &Arc, ) -> Self { Self { rx, - flush_interval, process_ctx: Arc::clone(process_ctx), } } @@ -95,21 +76,14 @@ impl MakeControl { /// - `Ok(())` if the timeout was successfully created. /// - `Err(io::Error)` if the timeout could not be created. pub fn make_control(self, taps: &Arc>, handle: &Handle) -> io::Result { - trace!("telemetry control flush_interval={:?}", self.flush_interval); - - let flush_timeout = Timeout::new(self.flush_interval, handle)?; - let (metrics_work, metrics_service) = + let (metrics_aggregate, metrics_service) = prometheus::new(&self.process_ctx); - let push_metrics = Some(PushMetrics::new(self.process_ctx)); Ok(Control { - push_metrics, - metrics_work, + metrics_aggregate, metrics_service, rx: Some(self.rx), taps: Some(taps.clone()), - flush_interval: self.flush_interval, - flush_timeout, handle: handle.clone(), }) } @@ -118,52 +92,13 @@ impl MakeControl { // ===== impl Control ===== impl Control { - /// Returns true if the flush timeout has expired, false otherwise. - #[inline] - fn flush_timeout_expired(&mut self) -> bool { - self.flush_timeout - .poll() - .ok() - .map(|r| r.is_ready()) - .unwrap_or(false) - } - - /// Returns true if this `Control` should flush metrics. - /// - /// Metrics should be flushed if either of the following conditions are true: - /// - we have aggregated `flush_bytes` bytes of data, - /// - we haven't sent a report in `flush_interval` seconds. - fn flush_report(&mut self) -> Option { - let metrics = if self.flush_timeout_expired() { - trace!("flush timeout expired"); - self.push_metrics.as_mut() - } else { - None - }; - - metrics.map(Self::generate_report) - } - - fn generate_report(m: &mut PushMetrics) -> ReportRequest { - let mut r = m.generate_report(); - r.proxy = 0; // 0 = Inbound, 1 = Outbound - r - } - - /// Reset the flush timeout. - fn reset_timeout(&mut self) { - trace!("flushing in {:?}", self.flush_interval); - self.flush_timeout - .reset(Instant::now() + self.flush_interval); - } - - fn recv(&mut self) -> Async> { + fn recv(&mut self) -> Poll, ()> { match self.rx.take() { - None => Async::Ready(None), + None => Ok(Async::Ready(None)), Some(mut rx) => { trace!("recv.poll({:?})", rx); - match rx.poll().expect("recv telemetry") { - Async::Ready(None) => Async::Ready(None), + match rx.poll() { + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), ev => { self.rx = Some(rx); ev @@ -198,82 +133,28 @@ impl Control { } -impl Stream for Control { - type Item = ReportRequest; +impl Future for Control { + type Item = (); type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { + fn poll(&mut self) -> Poll { trace!("poll"); loop { - let report = match self.recv() { - Async::NotReady => break, - Async::Ready(Some(ev)) => { + match try_ready!(self.recv()) { + Some(ev) => { if let Some(taps) = self.taps.as_mut() { if let Ok(mut t) = taps.lock() { t.inspect(&ev); } } - // XXX Only inbound events are currently aggregated. - if ev.proxy().is_inbound() { - if let Some(metrics) = self.push_metrics.as_mut() { - metrics.record_event(&ev); - } - } - - self.metrics_work.record_event(&ev); - - self.flush_report() + self.metrics_aggregate.record_event(&ev); } - Async::Ready(None) => { + None => { warn!("events finished"); - let report = self.push_metrics - .take() - .map(|mut m| Self::generate_report(&mut m)); - if report.is_none() { - return Ok(Async::Ready(None)); - } - report + return Ok(Async::Ready(())); } }; - - if let Some(report) = report { - self.reset_timeout(); - return Ok(Async::Ready(Some(report))); - } - } - - // There may be no new events, but the timeout fired; so check at least once - // explicitly: - if self.push_metrics.is_none() { - Ok(Async::Ready(None)) - } else { - match self.flush_report() { - None => { - // Either `rx` isn't ready or the timeout isn't ready - Ok(Async::NotReady) - } - Some(report) => { - self.reset_timeout(); - Ok(Async::Ready(Some(report))) - } - } } } } - -// NOTE: `flush_timeout` does not impl `Debug`. -impl fmt::Debug for Control { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Control") - .field("push_metrics", &self.push_metrics) - .field("rx", &self.rx) - .field("taps", &self.taps) - .field("flush_interval", &self.flush_interval) - .field( - "flush_timeout", - &format!("Timeout({:?})", &self.flush_interval), - ) - .finish() - } -} diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index b7a7b2e4a..c7f963295 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -1,309 +1,2 @@ -use std::net; -use std::sync::Arc; -use std::time::Duration; -use std::{u32, u64}; - -use http; -use indexmap::IndexMap; - -use conduit_proxy_controller_grpc::common::{ - TcpAddress, - Protocol, -}; -use conduit_proxy_controller_grpc::telemetry::{ - ClientTransport, - eos_ctx, - EosCtx, - EosScope, - ReportRequest, - RequestCtx, - RequestScope, - ResponseCtx, - ResponseScope, - ServerTransport, - TransportSummary, -}; -use ctx; -use telemetry::event::Event; - mod latency; pub mod prometheus; - -#[derive(Debug)] -pub struct Metrics { - sources: IndexMap, - destinations: IndexMap, - requests: IndexMap, - process_ctx: Arc, -} - -#[derive(Debug, Eq, PartialEq, Hash)] -struct RequestKey { - source: net::IpAddr, - destination: net::SocketAddr, - uri: http::Uri, - method: http::Method, -} - -#[derive(Debug, Default)] -struct RequestStats { - count: u32, - responses: IndexMap, ResponseStats>, -} - -#[derive(Debug, Default)] -struct ResponseStats { - ends: IndexMap, - /// Response latencies in tenths of a millisecond. - /// - /// Observed latencies are mapped to a count of the times that - /// latency value was seen. - latencies: latency::Histogram, -} - -#[derive(Debug, PartialEq, Eq, Hash)] -enum End { - Grpc(u32), - Reset(u32), - Other, -} - -#[derive(Debug, Default)] -struct TransportStats { - protocol: Protocol, - connects: u32, - disconnects: Vec, -} - -impl RequestKey { - fn from_ctx(ctx: &Arc) -> Self { - Self { - source: ctx.server.remote.ip(), - destination: ctx.client.remote, - uri: ctx.uri.clone(), - method: ctx.method.clone(), - } - } -} - -impl Metrics { - pub fn new(process_ctx: Arc) -> Self { - Metrics { - sources: IndexMap::new(), - destinations: IndexMap::new(), - requests: IndexMap::new(), - process_ctx, - } - } - - pub(super) fn record_event(&mut self, event: &Event) { - match *event { - Event::TransportOpen(ref transport) => { - self.transport(transport).connects += 1; - } - Event::TransportClose(ref transport, ref close) => { - self.transport(transport) - .disconnects - .push(TransportSummary { - duration_ms: dur_to_ms(close.duration), - bytes_sent: 0, - }); - } - - Event::StreamRequestOpen(ref req) => { - self.request(req).count += 1; - } - Event::StreamRequestFail(ref req, ref fail) => { - let stats = self.request(req) - .responses - .entry(None) - .or_insert_with(Default::default); - - let ends = stats - .ends - .entry(End::Reset(fail.error.into())) - .or_insert_with(Default::default); - - stats.latencies += fail.since_request_open; - *ends += 1; - } - Event::StreamRequestEnd(_, _) => { - // Do nothing, as the push metrics are slated for removal and - // we don't need to support this event. - } - Event::StreamResponseOpen(ref res, ref open) => { - self.response(res).latencies += open.since_request_open; - }, - Event::StreamResponseFail(ref res, ref fail) => { - *self.response_end(res, End::Reset(fail.error.into())) - += 1; - } - Event::StreamResponseEnd(ref res, ref end) => { - let e = end.grpc_status.map(End::Grpc).unwrap_or(End::Other); - *self.response_end(res, e) += 1; - } - } - } - - fn request<'a>(&mut self, req: &'a Arc) -> &mut RequestStats { - self.requests - .entry(RequestKey::from_ctx(req)) - .or_insert_with(RequestStats::default) - } - - fn response<'a>(&mut self, res: &'a Arc) -> &mut ResponseStats { - let req = self.request(&res.request); - req.responses - .entry(Some(res.status)) - .or_insert_with(Default::default) - } - - fn response_end<'a>( - &mut self, - res: &'a Arc, - end: End, - ) -> &mut u32 { - self.response(res) - .ends - .entry(end) - .or_insert_with(Default::default) - } - - fn transport<'a>(&mut self, transport: &'a ctx::transport::Ctx) -> &mut TransportStats { - match *transport { - ctx::transport::Ctx::Server(ref s) => { - let source = s.remote.ip(); - self.sources - .entry(source) - .or_insert_with(|| TransportStats { - protocol: s.protocol, - ..TransportStats::default() - }) - } - ctx::transport::Ctx::Client(ref c) => self.destinations - .entry(c.remote) - .or_insert_with(|| TransportStats { - protocol: c.protocol, - ..TransportStats::default() - }) - } - } - - pub fn generate_report(&mut self) -> ReportRequest { - let histogram_bucket_bounds_tenth_ms: Vec = - latency::BUCKET_BOUNDS.iter() - .map(|&latency| latency.into()) - .collect(); - - let mut server_transports = Vec::new(); - let mut client_transports = Vec::new(); - - for (ip, stats) in self.sources.drain(..) { - server_transports.push(ServerTransport { - source_ip: Some(ip.into()), - connects: stats.connects, - disconnects: stats.disconnects, - protocol: stats.protocol as i32, - }) - } - - for (addr, stats) in self.destinations.drain(..) { - client_transports.push(ClientTransport { - target_addr: Some(TcpAddress { - ip: Some(addr.ip().into()), - port: u32::from(addr.port()), - }), - connects: stats.connects, - disconnects: stats.disconnects, - protocol: stats.protocol as i32, - }); - } - - let mut requests = Vec::with_capacity(self.requests.len()); - - for (req, stats) in self.requests.drain(..) { - let mut responses = Vec::with_capacity(stats.responses.len()); - - for (status_code, res_stats) in stats.responses { - let mut ends = Vec::with_capacity(res_stats.ends.len()); - - for (end, streams) in res_stats.ends { - - ends.push(EosScope { - ctx: Some(EosCtx { - end: Some(match end { - End::Grpc(grpc) => eos_ctx::End::GrpcStatusCode(grpc), - End::Reset(reset) => eos_ctx::End::ResetErrorCode(reset), - End::Other => eos_ctx::End::Other(true), - }), - }), - streams, - }); - } - - - responses.push(ResponseScope { - ctx: status_code.map(|code| { - ResponseCtx { - http_status_code: u32::from(code.as_u16()), - } - }), - ends: ends, - response_latency_counts: res_stats.latencies - .into_iter() - // NOTE: this potential truncation is unlikely to cause - // problems here, as the push metrics reports have - // different semantics from the scrapable Prometheus - // metrics. Push metrics are reset every time a report - // is generated, while the scrapable metrics last for - // the entire lifetime of the process. - // - // Furthermore, this code is slated for removal soon. - .map(|count| count as u32) - .collect(), - }); - } - - requests.push(RequestScope { - ctx: Some(RequestCtx { - authority: req.uri - .authority_part() - .map(|a| a.to_string()) - .unwrap_or_else(String::new), - source_ip: Some(req.source.into()), - target_addr: Some(TcpAddress { - ip: Some(req.destination.ip().into()), - port: u32::from(req.destination.port()), - }), - }), - count: stats.count, - responses, - }) - } - - ReportRequest { - process: Some(self.process_ctx.as_ref().into()), - //TODO: store proxy in Metrics? - proxy: 0, - server_transports, - client_transports, - requests, - histogram_bucket_bounds_tenth_ms, - } - } -} - -fn dur_to_ms(dur: Duration) -> u64 { - dur.as_secs() - // note that this could just be saturating addition if we didn't want - // to log if an overflow occurs... - .checked_mul(1_000) - .and_then(|as_millis| { - let subsec = u64::from(dur.subsec_nanos() / latency::MS_TO_NS); - as_millis.checked_add(subsec) - }) - .unwrap_or_else(|| { - debug!("{:?} too large to convert to ms!", dur); - u64::MAX - }) -} diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index 4b7949186..5056e7ffd 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -1,7 +1,4 @@ -//! Sensors and reports telemetry from the proxy. - use std::sync::Arc; -use std::time::Duration; use futures_mpsc_lossy; @@ -23,22 +20,19 @@ pub use self::sensor::Sensors; /// that support telemetry. /// /// [`Control`] drives processing of all telemetry events for tapping as well as metrics -/// reporting. +/// aggregation. /// /// # Arguments -/// - `capacity`: the number of events to aggregate. -/// - `flush_interval`: the length of time after which a metrics report should be sent, -/// regardless of how many events have been aggregated. +/// - `capacity`: the size of the event queue. /// /// [`Sensors`]: struct.Sensors.html /// [`Control`]: struct.Control.html pub fn new( process: &Arc, capacity: usize, - flush_interval: Duration, ) -> (Sensors, MakeControl) { let (tx, rx) = futures_mpsc_lossy::channel(capacity); let s = Sensors::new(tx); - let c = MakeControl::new(rx, flush_interval, process); + let c = MakeControl::new(rx, process); (s, c) } diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 4076e6f54..cdc8237a5 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -21,7 +21,6 @@ struct Destination(Box Option + Send>); #[derive(Debug)] pub struct Controller { destinations: VecDeque<(String, Destination)>, - reports: Option>, } pub struct Listening { @@ -33,7 +32,6 @@ impl Controller { pub fn new() -> Self { Controller { destinations: VecDeque::new(), - reports: None, } } @@ -71,12 +69,6 @@ impl Controller { self.destination_fn(dest, || None) } - pub fn reports(&mut self) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded(); - self.reports = Some(tx); - rx - } - pub fn run(self) -> Listening { run(self) } @@ -86,7 +78,6 @@ type Response = self::http::Response; type Destinations = Arc>>; const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get"; -const TELEMETRY_REPORT: &str = "/conduit.proxy.telemetry.Telemetry/Report"; impl fmt::Debug for Destination { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -97,7 +88,6 @@ impl fmt::Debug for Destination { #[derive(Debug)] struct Svc { destinations: Destinations, - reports: Option>, } impl Svc { @@ -136,18 +126,6 @@ impl Svc { Ok(rsp) })) } - TELEMETRY_REPORT => { - let mut reports = self.reports.clone(); - Box::new(body.concat2().and_then(move |mut bytes| { - if let Some(ref mut report) = reports { - let req = Message::decode(bytes.split_off(5)).unwrap(); - let _ = report.unbounded_send(req); - } - let body = GrpcBody::new([0u8; 5][..].into()); - let rsp = rsp.body(body).unwrap(); - Ok(rsp) - })) - } unknown => { println!("unknown route: {:?}", unknown); let body = GrpcBody::unimplemented(); @@ -216,7 +194,6 @@ impl Body for GrpcBody { #[derive(Debug)] struct NewSvc { destinations: Destinations, - reports: Option>, } impl NewService for NewSvc { type Request = Request; @@ -229,7 +206,6 @@ impl NewService for NewSvc { fn new_service(&self) -> Self::Future { future::ok(Svc { destinations: self.destinations.clone(), - reports: self.reports.clone(), }) } } @@ -246,7 +222,6 @@ fn run(controller: Controller) -> Listening { let factory = NewSvc { destinations: Arc::new(Mutex::new(controller.destinations)), - reports: controller.reports, }; let h2 = tower_h2::Server::new(factory, Default::default(), reactor.clone()); diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index 28ce62518..34b792552 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -13,7 +13,6 @@ pub struct Proxy { inbound: Option, outbound: Option, - metrics_flush_interval: Option, inbound_disable_ports_protocol_detection: Option>, outbound_disable_ports_protocol_detection: Option>, @@ -39,7 +38,6 @@ impl Proxy { inbound: None, outbound: None, - metrics_flush_interval: None, inbound_disable_ports_protocol_detection: None, outbound_disable_ports_protocol_detection: None, shutdown_signal: None, @@ -61,11 +59,6 @@ impl Proxy { self } - pub fn metrics_flush_interval(mut self, dur: Duration) -> Self { - self.metrics_flush_interval = Some(dur); - self - } - pub fn disable_inbound_ports_protocol_detection(mut self, ports: Vec) -> Self { self.inbound_disable_ports_protocol_detection = Some(ports); self @@ -172,14 +165,6 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { let mut config = config::Config::try_from(&env).unwrap(); - // TODO: We currently can't use `config::ENV_METRICS_FLUSH_INTERVAL_SECS` - // because we need to be able to set the flush interval to a fraction of a - // second. We should change config::ENV_METRICS_FLUSH_INTERVAL_SECS so that - // it can support this. - if let Some(dur) = proxy.metrics_flush_interval { - config.metrics_flush_interval = dur; - } - let (running_tx, running_rx) = oneshot::channel(); let (tx, mut rx) = shutdown_signal(); diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index e5a333884..4d0902ce3 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -5,262 +5,6 @@ extern crate log; mod support; use self::support::*; -#[test] -fn inbound_sends_telemetry() { - let _ = env_logger::try_init(); - - info!("running test server"); - let srv = server::new().route("/hey", "hello").run(); - - let mut ctrl = controller::new(); - let reports = ctrl.reports(); - let proxy = proxy::new() - .controller(ctrl.run()) - .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) - .run(); - let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); - - info!("client.get(/hey)"); - assert_eq!(client.get("/hey"), "hello"); - - info!("awaiting report"); - let report = reports.wait().next().unwrap().unwrap(); - // proxy inbound - assert_eq!(report.proxy, 0); - // process - assert_eq!(report.process.as_ref().unwrap().node, ""); - assert_eq!(report.process.as_ref().unwrap().scheduled_instance, ""); - assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "test"); - // requests - assert_eq!(report.requests.len(), 1); - let req = &report.requests[0]; - assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local"); - //assert_eq!(req.ctx.as_ref().unwrap().method, GET); - assert_eq!(req.count, 1); - assert_eq!(req.responses.len(), 1); - // responses - let res = &req.responses[0]; - assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200); - // response latencies should always have a length equal to the number - // of latency buckets in the latency histogram. - assert_eq!( - res.response_latency_counts.len(), - report.histogram_bucket_bounds_tenth_ms.len() - ); - assert_eq!(res.ends.len(), 1); - // ends - let ends = &res.ends[0]; - assert_eq!(ends.streams, 1); -} - - -#[test] -fn http1_inbound_sends_telemetry() { - let _ = env_logger::try_init(); - - info!("running test server"); - let srv = server::http1().route("/hey", "hello").run(); - - let mut ctrl = controller::new(); - let reports = ctrl.reports(); - let proxy = proxy::new() - .controller(ctrl.run()) - .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) - .run(); - let client = client::http1(proxy.inbound, "tele.test.svc.cluster.local"); - - info!("client.get(/hey)"); - assert_eq!(client.get("/hey"), "hello"); - - info!("awaiting report"); - let report = reports.wait().next().unwrap().unwrap(); - // proxy inbound - assert_eq!(report.proxy, 0); - // requests - assert_eq!(report.requests.len(), 1); - let req = &report.requests[0]; - assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local"); - //assert_eq!(req.ctx.as_ref().unwrap().method, GET); - assert_eq!(req.count, 1); - assert_eq!(req.responses.len(), 1); - // responses - let res = &req.responses[0]; - assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200); - // response latencies should always have a length equal to the number - // of latency buckets in the latency histogram. - assert_eq!( - res.response_latency_counts.len(), - report.histogram_bucket_bounds_tenth_ms.len() - ); - assert_eq!(res.ends.len(), 1); - // ends - let ends = &res.ends[0]; - assert_eq!(ends.streams, 1); -} - - -#[test] -fn inbound_aggregates_telemetry_over_several_requests() { - let _ = env_logger::try_init(); - - info!("running test server"); - let srv = server::new() - .route("/hey", "hello") - .route("/hi", "good morning") - .run(); - - let mut ctrl = controller::new(); - let reports = ctrl.reports(); - let proxy = proxy::new() - .controller(ctrl.run()) - .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) - .run(); - let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); - - info!("client.get(/hey)"); - assert_eq!(client.get("/hey"), "hello"); - - info!("client.get(/hi)"); - assert_eq!(client.get("/hi"), "good morning"); - assert_eq!(client.get("/hi"), "good morning"); - - info!("awaiting report"); - let report = reports.wait().next().unwrap().unwrap(); - // proxy inbound - assert_eq!(report.proxy, 0); - - // requests ----------------------- - assert_eq!(report.requests.len(), 2); - - // -- first request ----------------- - let req = &report.requests[0]; - assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local"); - assert_eq!(req.count, 1); - assert_eq!(req.responses.len(), 1); - // ---- response -------------------- - let res = &req.responses[0]; - assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200); - // response latencies should always have a length equal to the number - // of latency buckets in the latency histogram. - assert_eq!( - res.response_latency_counts.len(), - report.histogram_bucket_bounds_tenth_ms.len() - ); - assert_eq!(res.ends.len(), 1); - - // ------ ends ---------------------- - let ends = &res.ends[0]; - assert_eq!(ends.streams, 1); - - // -- second request ---------------- - let req = &report.requests[1]; - assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local"); - // repeated twice - assert_eq!(req.count, 2); - assert_eq!(req.responses.len(), 1); - // ---- response ------------------- - let res = &req.responses[0]; - assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200); - // response latencies should always have a length equal to the number - // of latency buckets in the latency histogram. - assert_eq!( - res.response_latency_counts.len(), - report.histogram_bucket_bounds_tenth_ms.len() - ); - assert_eq!(res.ends.len(), 1); - - // ------ ends ---------------------- - let ends = &res.ends[0]; - assert_eq!(ends.streams, 2); - -} - -// Ignore this test on CI, because our method of adding latency to requests -// (calling `thread::sleep`) is likely to be flakey on Travis. -// Eventually, we can add some kind of mock timer system for simulating latency -// more reliably, and re-enable this test. -#[test] -#[cfg_attr(not(feature = "flaky_tests"), ignore)] -fn records_latency_statistics() { - let _ = env_logger::try_init(); - - info!("running test server"); - let srv = server::new() - .route_with_latency("/hey", "hello", Duration::from_millis(500)) - .route_with_latency("/hi", "good morning", Duration::from_millis(40)) - .run(); - - let mut ctrl = controller::new(); - let reports = ctrl.reports(); - let proxy = proxy::new() - .controller(ctrl.run()) - .inbound(srv) - .metrics_flush_interval(Duration::from_secs(5)) - .run(); - let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); - - info!("client.get(/hey)"); - assert_eq!(client.get("/hey"), "hello"); - - info!("client.get(/hi)"); - assert_eq!(client.get("/hi"), "good morning"); - assert_eq!(client.get("/hi"), "good morning"); - - info!("awaiting report"); - let report = reports.wait().next().unwrap().unwrap(); - - // requests ----------------------- - assert_eq!(report.requests.len(), 2); - // first request - let req = &report.requests[0]; - assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local"); - let res = &req.responses[0]; - // response latencies should always have a length equal to the number - // of latency buckets in the latency histogram. - assert_eq!( - res.response_latency_counts.len(), - report.histogram_bucket_bounds_tenth_ms.len() - ); - for (idx, bucket) in res.response_latency_counts.iter().enumerate() { - // 500 ms of extra latency should put us in the 500-1000 - // millisecond bucket (the 15th bucket) - if idx == 15 { - assert_eq!(*bucket, 1, "poorly bucketed latencies: {:?}", res.response_latency_counts); - } else { - assert_eq!(*bucket, 0, "poorly bucketed latencies: {:?}", res.response_latency_counts); - } - } - - // second request - let req = &report.requests.get(1).expect("second report"); - assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local"); - assert_eq!(req.count, 2); - assert_eq!(req.responses.len(), 1); - let res = req.responses.get(0).expect("responses[0]"); - // response latencies should always have a length equal to the number - // of latency buckets in the latency histogram. - assert_eq!( - res.response_latency_counts.len(), - report.histogram_bucket_bounds_tenth_ms.len() - ); - for (idx, bucket) in res.response_latency_counts.iter().enumerate() { - // 40 ms of extra latency should put us in the 40-50 - // millisecond bucket (the 10th bucket) - if idx == 9 { - assert_eq!(*bucket, 2, "poorly bucketed latencies: {:?}", res.response_latency_counts); - } else { - assert_eq!(*bucket, 0, "poorly bucketed latencies: {:?}", res.response_latency_counts); - } - } - -} - -#[test] -fn telemetry_report_errors_are_ignored() {} - macro_rules! assert_contains { ($scrape:expr, $contains:expr) => { assert_eventually!($scrape.contains($contains), "metrics scrape:\n{:8}\ndid not contain:\n{:8}", $scrape, $contains) @@ -278,7 +22,6 @@ fn metrics_endpoint_inbound_request_count() { let proxy = proxy::new() .controller(ctrl.run()) .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -308,7 +51,6 @@ fn metrics_endpoint_outbound_request_count() { let proxy = proxy::new() .controller(ctrl) .outbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -388,7 +130,6 @@ mod response_classification { let proxy = proxy::new() .controller(ctrl.run()) .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -419,7 +160,6 @@ mod response_classification { let proxy = proxy::new() .controller(ctrl) .outbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -460,7 +200,6 @@ fn metrics_endpoint_inbound_response_latency() { let proxy = proxy::new() .controller(ctrl.run()) .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -546,7 +285,6 @@ fn metrics_endpoint_outbound_response_latency() { let proxy = proxy::new() .controller(ctrl) .outbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -624,7 +362,6 @@ fn metrics_endpoint_inbound_request_duration() { let proxy = proxy::new() .controller(ctrl.run()) .inbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -660,7 +397,6 @@ fn metrics_endpoint_outbound_request_duration() { let proxy = proxy::new() .controller(ctrl) .outbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -722,7 +458,6 @@ mod outbound_dst_labels { let proxy = proxy::new() .controller(ctrl.run()) .outbound(srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let metrics = client::http1(proxy.metrics, "localhost"); @@ -921,7 +656,6 @@ fn metrics_have_no_double_commas() { .controller(ctrl) .inbound(inbound_srv) .outbound(outbound_srv) - .metrics_flush_interval(Duration::from_millis(500)) .run(); let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); let metrics = client::http1(proxy.metrics, "localhost"); @@ -942,5 +676,4 @@ fn metrics_have_no_double_commas() { let scrape = metrics.get("/metrics"); assert!(!scrape.contains(",,"), "outbound metrics had double comma"); - }