Stop pushing telemetry reports from the proxy (#616)

Now that the controller does not depend on pushed telemetry reports, the
proxy need not depend on the telemetry API or maintain legacy sampling
logic.
This commit is contained in:
Oliver Gould 2018-04-12 17:39:29 -07:00 committed by GitHub
parent 37434d048a
commit efdfc93b50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 28 additions and 925 deletions

View File

@ -8,7 +8,6 @@ fn build_control() {
let client_files = &[
"../../proto/common/common.proto",
"../../proto/proxy/destination/destination.proto",
"../../proto/proxy/telemetry/telemetry.proto",
];
let server_files = &["../../proto/proxy/tap/tap.proto"];
let dirs = &["../../proto"];

View File

@ -32,10 +32,6 @@ mod gen {
pub mod tap {
include!(concat!(env!("OUT_DIR"), "/conduit.proxy.tap.rs"));
}
pub mod telemetry {
include!(concat!(env!("OUT_DIR"), "/conduit.proxy.telemetry.rs"));
}
}
/// Converts a Rust Duration to a Protobuf Duration.

View File

@ -53,12 +53,6 @@ pub struct Config {
/// Event queue capacity.
pub event_buffer_capacity: usize,
/// Interval after which to flush metrics.
pub metrics_flush_interval: Duration,
/// Timeout after which to cancel telemetry reports.
pub report_timeout: Duration,
/// Timeout after which to cancel binding a request.
pub bind_timeout: Duration,
@ -131,8 +125,6 @@ pub struct TestEnv {
// Environment variables to look at when loading the configuration
const ENV_EVENT_BUFFER_CAPACITY: &str = "CONDUIT_PROXY_EVENT_BUFFER_CAPACITY";
pub const ENV_METRICS_FLUSH_INTERVAL_SECS: &str = "CONDUIT_PROXY_METRICS_FLUSH_INTERVAL_SECS";
const ENV_REPORT_TIMEOUT_SECS: &str = "CONDUIT_PROXY_REPORT_TIMEOUT_SECS";
pub const ENV_PRIVATE_LISTENER: &str = "CONDUIT_PROXY_PRIVATE_LISTENER";
pub const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD";
pub const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER";
@ -156,8 +148,6 @@ const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
// Default values for various configuration fields
const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 10_000; // FIXME
const DEFAULT_METRICS_FLUSH_INTERVAL_SECS: u64 = 10;
const DEFAULT_REPORT_TIMEOUT_SECS: u64 = 10; // TODO: is this a reasonable default?
const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140";
const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143";
const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190";
@ -196,9 +186,6 @@ impl<'a> TryFrom<&'a Strings> for Config {
let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_number);
let resolv_conf_path = strings.get(ENV_RESOLV_CONF);
let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number);
let metrics_flush_interval_secs =
parse(strings, ENV_METRICS_FLUSH_INTERVAL_SECS, parse_number);
let report_timeout = parse(strings, ENV_REPORT_TIMEOUT_SECS, parse_number);
let pod_name = strings.get(ENV_POD_NAME);
let pod_namespace = strings.get(ENV_POD_NAMESPACE).and_then(|maybe_value| {
// There cannot be a default pod namespace, and the pod namespace is required.
@ -255,11 +242,6 @@ impl<'a> TryFrom<&'a Strings> for Config {
control_host_and_port: control_host_and_port?,
event_buffer_capacity: event_buffer_capacity?.unwrap_or(DEFAULT_EVENT_BUFFER_CAPACITY),
metrics_flush_interval:
Duration::from_secs(metrics_flush_interval_secs?
.unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS)),
report_timeout:
Duration::from_secs(report_timeout?.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)),
bind_timeout:
Duration::from_millis(bind_timeout?.unwrap_or(DEFAULT_BIND_TIMEOUT_MS)),
pod_name: pod_name?,

View File

@ -3,7 +3,7 @@ use std::io;
use std::time::{Duration, Instant};
use bytes::Bytes;
use futures::{future, Async, Future, Poll, Stream};
use futures::{future, Async, Future, Poll};
use h2;
use http;
use tokio_core::reactor::{
@ -25,13 +25,10 @@ pub mod discovery;
mod fully_qualified_authority;
mod observe;
pub mod pb;
mod telemetry;
use self::discovery::{Background as DiscoBg, Discovery, Watch};
pub use self::discovery::Bind;
pub use self::observe::Observe;
use conduit_proxy_controller_grpc::telemetry::ReportRequest;
use self::telemetry::Telemetry;
pub struct Control {
disco: Discovery,
@ -67,17 +64,12 @@ impl Control {
// ===== impl Background =====
impl Background {
pub fn bind<S>(
pub fn bind(
self,
events: S,
host_and_port: HostAndPort,
dns_config: dns::Config,
report_timeout: Duration,
executor: &Handle,
) -> Box<Future<Item = (), Error = ()>>
where
S: Stream<Item = ReportRequest, Error = ()> + 'static,
{
) -> Box<Future<Item = (), Error = ()>> {
// Build up the Controller Client Stack
let mut client = {
let ctx = ("controller-client", format!("{:?}", host_and_port));
@ -96,7 +88,6 @@ impl Background {
::logging::context_executor(ctx, executor.clone()),
);
let reconnect = Reconnect::new(h2_client);
let log_errors = LogErrors::new(reconnect);
let backoff = Backoff::new(log_errors, Duration::from_secs(5), executor);
@ -105,11 +96,9 @@ impl Background {
};
let mut disco = self.disco.work(executor);
let mut telemetry = Telemetry::new(events, report_timeout, executor);
let fut = future::poll_fn(move || {
disco.poll_rpc(&mut client);
telemetry.poll_rpc(&mut client);
Ok(Async::NotReady)
});

View File

@ -1,109 +0,0 @@
use std::fmt;
use std::time::{Duration, Instant};
use futures::{Async, Future, Stream};
use tokio_core::reactor::Handle;
use tower_h2::{HttpService, BoxBody};
use tower_grpc as grpc;
use conduit_proxy_controller_grpc::telemetry::{ReportRequest, ReportResponse};
use conduit_proxy_controller_grpc::telemetry::client::Telemetry as TelemetrySvc;
use ::timeout::{Timeout, TimeoutFuture};
type TelemetryStream<F, B> = grpc::client::unary::ResponseFuture<
ReportResponse, TimeoutFuture<F>, B>;
#[derive(Debug)]
pub struct Telemetry<T, S: HttpService> {
reports: T,
in_flight: Option<(Instant, TelemetryStream<S::Future, S::ResponseBody>)>,
report_timeout: Duration,
handle: Handle,
}
impl<T, S> Telemetry<T, S>
where
S: HttpService<RequestBody = BoxBody, ResponseBody = ::tower_h2::RecvBody>,
S::Error: fmt::Debug,
T: Stream<Item = ReportRequest>,
T::Error: ::std::fmt::Debug,
{
pub fn new(reports: T, report_timeout: Duration, handle: &Handle) -> Self {
Telemetry {
reports,
in_flight: None,
report_timeout,
handle: handle.clone(),
}
}
pub fn poll_rpc(&mut self, client: &mut S)
{
let client = Timeout::new(client.lift_ref(), self.report_timeout, &self.handle);
let mut svc = TelemetrySvc::new(client);
//let _ctxt = ::logging::context("Telemetry.Report".into());
loop {
if let Some((t0, mut fut)) = self.in_flight.take() {
match fut.poll() {
Ok(Async::NotReady) => {
// TODO: can we just move this logging logic to `Timeout`?
trace!("report in flight to controller for {:?}", t0.elapsed());
self.in_flight = Some((t0, fut))
}
Ok(Async::Ready(_)) => {
trace!("report sent to controller in {:?}", t0.elapsed())
}
Err(err) => warn!("controller error: {:?}", err),
}
}
let controller_ready = self.in_flight.is_none() && match svc.poll_ready() {
Ok(Async::Ready(_)) => true,
Ok(Async::NotReady) => {
trace!("controller unavailable");
false
}
Err(err) => {
warn!("controller error: {:?}", err);
false
}
};
match self.reports.poll() {
Ok(Async::NotReady) => {
return;
}
Ok(Async::Ready(None)) => {
debug!("report stream complete");
return;
}
Err(err) => {
warn!("report stream error: {:?}", err);
}
Ok(Async::Ready(Some(report))) => {
// Attempt to send the report. Continue looping so that `reports` is
// polled until it's not ready.
if !controller_ready {
info!(
"report dropped; requests={} accepts={} connects={}",
report.requests.len(),
report.server_transports.len(),
report.client_transports.len(),
);
} else {
trace!(
"report sent; requests={} accepts={} connects={}",
report.requests.len(),
report.server_transports.len(),
report.client_transports.len(),
);
let rep = svc.report(grpc::Request::new(report));
self.in_flight = Some((Instant::now(), rep));
}
}
}
}
}
}

View File

@ -8,7 +8,6 @@
//! be stored in `http::Extensions`, for instance. Furthermore, because these contexts
//! will be sent to a telemetry processing thread, we want to avoid excessive cloning.
use config;
use conduit_proxy_controller_grpc::telemetry as proto;
use std::time::SystemTime;
use std::sync::Arc;
pub mod http;
@ -77,17 +76,6 @@ impl Process {
}
}
impl<'a> Into<proto::Process> for &'a Process {
fn into(self) -> proto::Process {
// TODO: can this be implemented without cloning Strings?
proto::Process {
node: self.node.clone(),
scheduled_instance: self.scheduled_instance.clone(),
scheduled_namespace: self.scheduled_namespace.clone(),
}
}
}
impl Proxy {
pub fn inbound(p: &Arc<Process>) -> Arc<Self> {
Arc::new(Proxy::Inbound(Arc::clone(p)))

View File

@ -201,7 +201,6 @@ where
let (sensors, telemetry) = telemetry::new(
&process_ctx,
config.event_buffer_capacity,
config.metrics_flush_interval,
);
let dns_config = dns::Config::from_file(&config.resolv_conf_path);
@ -262,7 +261,6 @@ where
let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>();
{
let report_timeout = config.report_timeout;
thread::Builder::new()
.name("controller-client".into())
.spawn(move || {
@ -288,15 +286,14 @@ where
.serve_metrics(metrics_listener);
let client = control_bg.bind(
telemetry,
control_host_and_port,
dns_config,
report_timeout,
&executor
);
let fut = client.join3(
let fut = client.join4(
server.map_err(|_| {}),
telemetry,
metrics_server.map_err(|_| {}),
).map(|_| {});
executor.spawn(::logging::context_future("controller-client", fut));

View File

@ -1,15 +1,13 @@
use std::{fmt, io};
use std::io;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use futures::{future, Async, Future, Poll, Stream};
use futures_mpsc_lossy::Receiver;
use tokio_core::reactor::{Handle, Timeout};
use tokio_core::reactor::Handle;
use super::event::Event;
use super::metrics::{prometheus, Metrics as PushMetrics};
use super::metrics::prometheus;
use super::tap::Taps;
use conduit_proxy_controller_grpc::telemetry::ReportRequest;
use connection;
use ctx;
@ -19,10 +17,6 @@ pub struct MakeControl {
/// Receives events.
rx: Receiver<Event>,
/// Limits the amount of time metrics may be buffered before being flushed to the
/// controller.
flush_interval: Duration,
process_ctx: Arc<ctx::Process>,
}
@ -37,12 +31,10 @@ pub struct MakeControl {
///
/// # TODO
/// Limit the amount of memory that may be consumed for metrics aggregation.
#[derive(Debug)]
pub struct Control {
/// Holds the current state of aggregated metrics.
push_metrics: Option<PushMetrics>,
/// Aggregates scrapable metrics.
metrics_work: prometheus::Aggregate,
metrics_aggregate: prometheus::Aggregate,
/// Serves scrapable metrics.
metrics_service: prometheus::Serve,
@ -53,14 +45,6 @@ pub struct Control {
/// Holds the current state of tap observations, as configured by an external source.
taps: Option<Arc<Mutex<Taps>>>,
/// Limits the amount of time metrics may be buffered before being flushed to the
/// controller.
flush_interval: Duration,
/// Ensures liveliness of telemetry by waking the stream to produce reports when
/// needed. This timeout is reset as reports are returned.
flush_timeout: Timeout,
handle: Handle,
}
@ -71,16 +55,13 @@ impl MakeControl {
///
/// # Arguments
/// - `rx`: the `Receiver` side of the channel on which events are sent.
/// - `flush_interval`: the maximum amount of time between sending reports to the
/// controller.
/// - `process_ctx`: runtime process metadata.
pub(super) fn new(
rx: Receiver<Event>,
flush_interval: Duration,
process_ctx: &Arc<ctx::Process>,
) -> Self {
Self {
rx,
flush_interval,
process_ctx: Arc::clone(process_ctx),
}
}
@ -95,21 +76,14 @@ impl MakeControl {
/// - `Ok(())` if the timeout was successfully created.
/// - `Err(io::Error)` if the timeout could not be created.
pub fn make_control(self, taps: &Arc<Mutex<Taps>>, handle: &Handle) -> io::Result<Control> {
trace!("telemetry control flush_interval={:?}", self.flush_interval);
let flush_timeout = Timeout::new(self.flush_interval, handle)?;
let (metrics_work, metrics_service) =
let (metrics_aggregate, metrics_service) =
prometheus::new(&self.process_ctx);
let push_metrics = Some(PushMetrics::new(self.process_ctx));
Ok(Control {
push_metrics,
metrics_work,
metrics_aggregate,
metrics_service,
rx: Some(self.rx),
taps: Some(taps.clone()),
flush_interval: self.flush_interval,
flush_timeout,
handle: handle.clone(),
})
}
@ -118,52 +92,13 @@ impl MakeControl {
// ===== impl Control =====
impl Control {
/// Returns true if the flush timeout has expired, false otherwise.
#[inline]
fn flush_timeout_expired(&mut self) -> bool {
self.flush_timeout
.poll()
.ok()
.map(|r| r.is_ready())
.unwrap_or(false)
}
/// Returns true if this `Control` should flush metrics.
///
/// Metrics should be flushed if either of the following conditions are true:
/// - we have aggregated `flush_bytes` bytes of data,
/// - we haven't sent a report in `flush_interval` seconds.
fn flush_report(&mut self) -> Option<ReportRequest> {
let metrics = if self.flush_timeout_expired() {
trace!("flush timeout expired");
self.push_metrics.as_mut()
} else {
None
};
metrics.map(Self::generate_report)
}
fn generate_report(m: &mut PushMetrics) -> ReportRequest {
let mut r = m.generate_report();
r.proxy = 0; // 0 = Inbound, 1 = Outbound
r
}
/// Reset the flush timeout.
fn reset_timeout(&mut self) {
trace!("flushing in {:?}", self.flush_interval);
self.flush_timeout
.reset(Instant::now() + self.flush_interval);
}
fn recv(&mut self) -> Async<Option<Event>> {
fn recv(&mut self) -> Poll<Option<Event>, ()> {
match self.rx.take() {
None => Async::Ready(None),
None => Ok(Async::Ready(None)),
Some(mut rx) => {
trace!("recv.poll({:?})", rx);
match rx.poll().expect("recv telemetry") {
Async::Ready(None) => Async::Ready(None),
match rx.poll() {
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
ev => {
self.rx = Some(rx);
ev
@ -198,82 +133,28 @@ impl Control {
}
impl Stream for Control {
type Item = ReportRequest;
impl Future for Control {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
trace!("poll");
loop {
let report = match self.recv() {
Async::NotReady => break,
Async::Ready(Some(ev)) => {
match try_ready!(self.recv()) {
Some(ev) => {
if let Some(taps) = self.taps.as_mut() {
if let Ok(mut t) = taps.lock() {
t.inspect(&ev);
}
}
// XXX Only inbound events are currently aggregated.
if ev.proxy().is_inbound() {
if let Some(metrics) = self.push_metrics.as_mut() {
metrics.record_event(&ev);
}
}
self.metrics_work.record_event(&ev);
self.flush_report()
self.metrics_aggregate.record_event(&ev);
}
Async::Ready(None) => {
None => {
warn!("events finished");
let report = self.push_metrics
.take()
.map(|mut m| Self::generate_report(&mut m));
if report.is_none() {
return Ok(Async::Ready(None));
}
report
return Ok(Async::Ready(()));
}
};
if let Some(report) = report {
self.reset_timeout();
return Ok(Async::Ready(Some(report)));
}
}
// There may be no new events, but the timeout fired; so check at least once
// explicitly:
if self.push_metrics.is_none() {
Ok(Async::Ready(None))
} else {
match self.flush_report() {
None => {
// Either `rx` isn't ready or the timeout isn't ready
Ok(Async::NotReady)
}
Some(report) => {
self.reset_timeout();
Ok(Async::Ready(Some(report)))
}
}
}
}
}
// NOTE: `flush_timeout` does not impl `Debug`.
impl fmt::Debug for Control {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Control")
.field("push_metrics", &self.push_metrics)
.field("rx", &self.rx)
.field("taps", &self.taps)
.field("flush_interval", &self.flush_interval)
.field(
"flush_timeout",
&format!("Timeout({:?})", &self.flush_interval),
)
.finish()
}
}

View File

@ -1,309 +1,2 @@
use std::net;
use std::sync::Arc;
use std::time::Duration;
use std::{u32, u64};
use http;
use indexmap::IndexMap;
use conduit_proxy_controller_grpc::common::{
TcpAddress,
Protocol,
};
use conduit_proxy_controller_grpc::telemetry::{
ClientTransport,
eos_ctx,
EosCtx,
EosScope,
ReportRequest,
RequestCtx,
RequestScope,
ResponseCtx,
ResponseScope,
ServerTransport,
TransportSummary,
};
use ctx;
use telemetry::event::Event;
mod latency;
pub mod prometheus;
#[derive(Debug)]
pub struct Metrics {
sources: IndexMap<net::IpAddr, TransportStats>,
destinations: IndexMap<net::SocketAddr, TransportStats>,
requests: IndexMap<RequestKey, RequestStats>,
process_ctx: Arc<ctx::Process>,
}
#[derive(Debug, Eq, PartialEq, Hash)]
struct RequestKey {
source: net::IpAddr,
destination: net::SocketAddr,
uri: http::Uri,
method: http::Method,
}
#[derive(Debug, Default)]
struct RequestStats {
count: u32,
responses: IndexMap<Option<http::StatusCode>, ResponseStats>,
}
#[derive(Debug, Default)]
struct ResponseStats {
ends: IndexMap<End, u32>,
/// Response latencies in tenths of a millisecond.
///
/// Observed latencies are mapped to a count of the times that
/// latency value was seen.
latencies: latency::Histogram,
}
#[derive(Debug, PartialEq, Eq, Hash)]
enum End {
Grpc(u32),
Reset(u32),
Other,
}
#[derive(Debug, Default)]
struct TransportStats {
protocol: Protocol,
connects: u32,
disconnects: Vec<TransportSummary>,
}
impl RequestKey {
fn from_ctx(ctx: &Arc<ctx::http::Request>) -> Self {
Self {
source: ctx.server.remote.ip(),
destination: ctx.client.remote,
uri: ctx.uri.clone(),
method: ctx.method.clone(),
}
}
}
impl Metrics {
pub fn new(process_ctx: Arc<ctx::Process>) -> Self {
Metrics {
sources: IndexMap::new(),
destinations: IndexMap::new(),
requests: IndexMap::new(),
process_ctx,
}
}
pub(super) fn record_event(&mut self, event: &Event) {
match *event {
Event::TransportOpen(ref transport) => {
self.transport(transport).connects += 1;
}
Event::TransportClose(ref transport, ref close) => {
self.transport(transport)
.disconnects
.push(TransportSummary {
duration_ms: dur_to_ms(close.duration),
bytes_sent: 0,
});
}
Event::StreamRequestOpen(ref req) => {
self.request(req).count += 1;
}
Event::StreamRequestFail(ref req, ref fail) => {
let stats = self.request(req)
.responses
.entry(None)
.or_insert_with(Default::default);
let ends = stats
.ends
.entry(End::Reset(fail.error.into()))
.or_insert_with(Default::default);
stats.latencies += fail.since_request_open;
*ends += 1;
}
Event::StreamRequestEnd(_, _) => {
// Do nothing, as the push metrics are slated for removal and
// we don't need to support this event.
}
Event::StreamResponseOpen(ref res, ref open) => {
self.response(res).latencies += open.since_request_open;
},
Event::StreamResponseFail(ref res, ref fail) => {
*self.response_end(res, End::Reset(fail.error.into()))
+= 1;
}
Event::StreamResponseEnd(ref res, ref end) => {
let e = end.grpc_status.map(End::Grpc).unwrap_or(End::Other);
*self.response_end(res, e) += 1;
}
}
}
fn request<'a>(&mut self, req: &'a Arc<ctx::http::Request>) -> &mut RequestStats {
self.requests
.entry(RequestKey::from_ctx(req))
.or_insert_with(RequestStats::default)
}
fn response<'a>(&mut self, res: &'a Arc<ctx::http::Response>) -> &mut ResponseStats {
let req = self.request(&res.request);
req.responses
.entry(Some(res.status))
.or_insert_with(Default::default)
}
fn response_end<'a>(
&mut self,
res: &'a Arc<ctx::http::Response>,
end: End,
) -> &mut u32 {
self.response(res)
.ends
.entry(end)
.or_insert_with(Default::default)
}
fn transport<'a>(&mut self, transport: &'a ctx::transport::Ctx) -> &mut TransportStats {
match *transport {
ctx::transport::Ctx::Server(ref s) => {
let source = s.remote.ip();
self.sources
.entry(source)
.or_insert_with(|| TransportStats {
protocol: s.protocol,
..TransportStats::default()
})
}
ctx::transport::Ctx::Client(ref c) => self.destinations
.entry(c.remote)
.or_insert_with(|| TransportStats {
protocol: c.protocol,
..TransportStats::default()
})
}
}
pub fn generate_report(&mut self) -> ReportRequest {
let histogram_bucket_bounds_tenth_ms: Vec<u32> =
latency::BUCKET_BOUNDS.iter()
.map(|&latency| latency.into())
.collect();
let mut server_transports = Vec::new();
let mut client_transports = Vec::new();
for (ip, stats) in self.sources.drain(..) {
server_transports.push(ServerTransport {
source_ip: Some(ip.into()),
connects: stats.connects,
disconnects: stats.disconnects,
protocol: stats.protocol as i32,
})
}
for (addr, stats) in self.destinations.drain(..) {
client_transports.push(ClientTransport {
target_addr: Some(TcpAddress {
ip: Some(addr.ip().into()),
port: u32::from(addr.port()),
}),
connects: stats.connects,
disconnects: stats.disconnects,
protocol: stats.protocol as i32,
});
}
let mut requests = Vec::with_capacity(self.requests.len());
for (req, stats) in self.requests.drain(..) {
let mut responses = Vec::with_capacity(stats.responses.len());
for (status_code, res_stats) in stats.responses {
let mut ends = Vec::with_capacity(res_stats.ends.len());
for (end, streams) in res_stats.ends {
ends.push(EosScope {
ctx: Some(EosCtx {
end: Some(match end {
End::Grpc(grpc) => eos_ctx::End::GrpcStatusCode(grpc),
End::Reset(reset) => eos_ctx::End::ResetErrorCode(reset),
End::Other => eos_ctx::End::Other(true),
}),
}),
streams,
});
}
responses.push(ResponseScope {
ctx: status_code.map(|code| {
ResponseCtx {
http_status_code: u32::from(code.as_u16()),
}
}),
ends: ends,
response_latency_counts: res_stats.latencies
.into_iter()
// NOTE: this potential truncation is unlikely to cause
// problems here, as the push metrics reports have
// different semantics from the scrapable Prometheus
// metrics. Push metrics are reset every time a report
// is generated, while the scrapable metrics last for
// the entire lifetime of the process.
//
// Furthermore, this code is slated for removal soon.
.map(|count| count as u32)
.collect(),
});
}
requests.push(RequestScope {
ctx: Some(RequestCtx {
authority: req.uri
.authority_part()
.map(|a| a.to_string())
.unwrap_or_else(String::new),
source_ip: Some(req.source.into()),
target_addr: Some(TcpAddress {
ip: Some(req.destination.ip().into()),
port: u32::from(req.destination.port()),
}),
}),
count: stats.count,
responses,
})
}
ReportRequest {
process: Some(self.process_ctx.as_ref().into()),
//TODO: store proxy in Metrics?
proxy: 0,
server_transports,
client_transports,
requests,
histogram_bucket_bounds_tenth_ms,
}
}
}
fn dur_to_ms(dur: Duration) -> u64 {
dur.as_secs()
// note that this could just be saturating addition if we didn't want
// to log if an overflow occurs...
.checked_mul(1_000)
.and_then(|as_millis| {
let subsec = u64::from(dur.subsec_nanos() / latency::MS_TO_NS);
as_millis.checked_add(subsec)
})
.unwrap_or_else(|| {
debug!("{:?} too large to convert to ms!", dur);
u64::MAX
})
}

View File

@ -1,7 +1,4 @@
//! Sensors and reports telemetry from the proxy.
use std::sync::Arc;
use std::time::Duration;
use futures_mpsc_lossy;
@ -23,22 +20,19 @@ pub use self::sensor::Sensors;
/// that support telemetry.
///
/// [`Control`] drives processing of all telemetry events for tapping as well as metrics
/// reporting.
/// aggregation.
///
/// # Arguments
/// - `capacity`: the number of events to aggregate.
/// - `flush_interval`: the length of time after which a metrics report should be sent,
/// regardless of how many events have been aggregated.
/// - `capacity`: the size of the event queue.
///
/// [`Sensors`]: struct.Sensors.html
/// [`Control`]: struct.Control.html
pub fn new(
process: &Arc<ctx::Process>,
capacity: usize,
flush_interval: Duration,
) -> (Sensors, MakeControl) {
let (tx, rx) = futures_mpsc_lossy::channel(capacity);
let s = Sensors::new(tx);
let c = MakeControl::new(rx, flush_interval, process);
let c = MakeControl::new(rx, process);
(s, c)
}

View File

@ -21,7 +21,6 @@ struct Destination(Box<Fn() -> Option<pb::destination::Update> + Send>);
#[derive(Debug)]
pub struct Controller {
destinations: VecDeque<(String, Destination)>,
reports: Option<mpsc::UnboundedSender<pb::telemetry::ReportRequest>>,
}
pub struct Listening {
@ -33,7 +32,6 @@ impl Controller {
pub fn new() -> Self {
Controller {
destinations: VecDeque::new(),
reports: None,
}
}
@ -71,12 +69,6 @@ impl Controller {
self.destination_fn(dest, || None)
}
pub fn reports(&mut self) -> mpsc::UnboundedReceiver<pb::telemetry::ReportRequest> {
let (tx, rx) = mpsc::unbounded();
self.reports = Some(tx);
rx
}
pub fn run(self) -> Listening {
run(self)
}
@ -86,7 +78,6 @@ type Response = self::http::Response<GrpcBody>;
type Destinations = Arc<Mutex<VecDeque<(String, Destination)>>>;
const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get";
const TELEMETRY_REPORT: &str = "/conduit.proxy.telemetry.Telemetry/Report";
impl fmt::Debug for Destination {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@ -97,7 +88,6 @@ impl fmt::Debug for Destination {
#[derive(Debug)]
struct Svc {
destinations: Destinations,
reports: Option<mpsc::UnboundedSender<pb::telemetry::ReportRequest>>,
}
impl Svc {
@ -136,18 +126,6 @@ impl Svc {
Ok(rsp)
}))
}
TELEMETRY_REPORT => {
let mut reports = self.reports.clone();
Box::new(body.concat2().and_then(move |mut bytes| {
if let Some(ref mut report) = reports {
let req = Message::decode(bytes.split_off(5)).unwrap();
let _ = report.unbounded_send(req);
}
let body = GrpcBody::new([0u8; 5][..].into());
let rsp = rsp.body(body).unwrap();
Ok(rsp)
}))
}
unknown => {
println!("unknown route: {:?}", unknown);
let body = GrpcBody::unimplemented();
@ -216,7 +194,6 @@ impl Body for GrpcBody {
#[derive(Debug)]
struct NewSvc {
destinations: Destinations,
reports: Option<mpsc::UnboundedSender<pb::telemetry::ReportRequest>>,
}
impl NewService for NewSvc {
type Request = Request<RecvBody>;
@ -229,7 +206,6 @@ impl NewService for NewSvc {
fn new_service(&self) -> Self::Future {
future::ok(Svc {
destinations: self.destinations.clone(),
reports: self.reports.clone(),
})
}
}
@ -246,7 +222,6 @@ fn run(controller: Controller) -> Listening {
let factory = NewSvc {
destinations: Arc::new(Mutex::new(controller.destinations)),
reports: controller.reports,
};
let h2 = tower_h2::Server::new(factory, Default::default(), reactor.clone());

View File

@ -13,7 +13,6 @@ pub struct Proxy {
inbound: Option<server::Listening>,
outbound: Option<server::Listening>,
metrics_flush_interval: Option<Duration>,
inbound_disable_ports_protocol_detection: Option<Vec<u16>>,
outbound_disable_ports_protocol_detection: Option<Vec<u16>>,
@ -39,7 +38,6 @@ impl Proxy {
inbound: None,
outbound: None,
metrics_flush_interval: None,
inbound_disable_ports_protocol_detection: None,
outbound_disable_ports_protocol_detection: None,
shutdown_signal: None,
@ -61,11 +59,6 @@ impl Proxy {
self
}
pub fn metrics_flush_interval(mut self, dur: Duration) -> Self {
self.metrics_flush_interval = Some(dur);
self
}
pub fn disable_inbound_ports_protocol_detection(mut self, ports: Vec<u16>) -> Self {
self.inbound_disable_ports_protocol_detection = Some(ports);
self
@ -172,14 +165,6 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
let mut config = config::Config::try_from(&env).unwrap();
// TODO: We currently can't use `config::ENV_METRICS_FLUSH_INTERVAL_SECS`
// because we need to be able to set the flush interval to a fraction of a
// second. We should change config::ENV_METRICS_FLUSH_INTERVAL_SECS so that
// it can support this.
if let Some(dur) = proxy.metrics_flush_interval {
config.metrics_flush_interval = dur;
}
let (running_tx, running_rx) = oneshot::channel();
let (tx, mut rx) = shutdown_signal();

View File

@ -5,262 +5,6 @@ extern crate log;
mod support;
use self::support::*;
#[test]
fn inbound_sends_telemetry() {
let _ = env_logger::try_init();
info!("running test server");
let srv = server::new().route("/hey", "hello").run();
let mut ctrl = controller::new();
let reports = ctrl.reports();
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
info!("awaiting report");
let report = reports.wait().next().unwrap().unwrap();
// proxy inbound
assert_eq!(report.proxy, 0);
// process
assert_eq!(report.process.as_ref().unwrap().node, "");
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "test");
// requests
assert_eq!(report.requests.len(), 1);
let req = &report.requests[0];
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
assert_eq!(req.count, 1);
assert_eq!(req.responses.len(), 1);
// responses
let res = &req.responses[0];
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
// response latencies should always have a length equal to the number
// of latency buckets in the latency histogram.
assert_eq!(
res.response_latency_counts.len(),
report.histogram_bucket_bounds_tenth_ms.len()
);
assert_eq!(res.ends.len(), 1);
// ends
let ends = &res.ends[0];
assert_eq!(ends.streams, 1);
}
#[test]
fn http1_inbound_sends_telemetry() {
let _ = env_logger::try_init();
info!("running test server");
let srv = server::http1().route("/hey", "hello").run();
let mut ctrl = controller::new();
let reports = ctrl.reports();
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::http1(proxy.inbound, "tele.test.svc.cluster.local");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
info!("awaiting report");
let report = reports.wait().next().unwrap().unwrap();
// proxy inbound
assert_eq!(report.proxy, 0);
// requests
assert_eq!(report.requests.len(), 1);
let req = &report.requests[0];
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
assert_eq!(req.count, 1);
assert_eq!(req.responses.len(), 1);
// responses
let res = &req.responses[0];
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
// response latencies should always have a length equal to the number
// of latency buckets in the latency histogram.
assert_eq!(
res.response_latency_counts.len(),
report.histogram_bucket_bounds_tenth_ms.len()
);
assert_eq!(res.ends.len(), 1);
// ends
let ends = &res.ends[0];
assert_eq!(ends.streams, 1);
}
#[test]
fn inbound_aggregates_telemetry_over_several_requests() {
let _ = env_logger::try_init();
info!("running test server");
let srv = server::new()
.route("/hey", "hello")
.route("/hi", "good morning")
.run();
let mut ctrl = controller::new();
let reports = ctrl.reports();
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");
assert_eq!(client.get("/hi"), "good morning");
info!("awaiting report");
let report = reports.wait().next().unwrap().unwrap();
// proxy inbound
assert_eq!(report.proxy, 0);
// requests -----------------------
assert_eq!(report.requests.len(), 2);
// -- first request -----------------
let req = &report.requests[0];
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
assert_eq!(req.count, 1);
assert_eq!(req.responses.len(), 1);
// ---- response --------------------
let res = &req.responses[0];
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
// response latencies should always have a length equal to the number
// of latency buckets in the latency histogram.
assert_eq!(
res.response_latency_counts.len(),
report.histogram_bucket_bounds_tenth_ms.len()
);
assert_eq!(res.ends.len(), 1);
// ------ ends ----------------------
let ends = &res.ends[0];
assert_eq!(ends.streams, 1);
// -- second request ----------------
let req = &report.requests[1];
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
// repeated twice
assert_eq!(req.count, 2);
assert_eq!(req.responses.len(), 1);
// ---- response -------------------
let res = &req.responses[0];
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
// response latencies should always have a length equal to the number
// of latency buckets in the latency histogram.
assert_eq!(
res.response_latency_counts.len(),
report.histogram_bucket_bounds_tenth_ms.len()
);
assert_eq!(res.ends.len(), 1);
// ------ ends ----------------------
let ends = &res.ends[0];
assert_eq!(ends.streams, 2);
}
// Ignore this test on CI, because our method of adding latency to requests
// (calling `thread::sleep`) is likely to be flakey on Travis.
// Eventually, we can add some kind of mock timer system for simulating latency
// more reliably, and re-enable this test.
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn records_latency_statistics() {
let _ = env_logger::try_init();
info!("running test server");
let srv = server::new()
.route_with_latency("/hey", "hello", Duration::from_millis(500))
.route_with_latency("/hi", "good morning", Duration::from_millis(40))
.run();
let mut ctrl = controller::new();
let reports = ctrl.reports();
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_secs(5))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");
assert_eq!(client.get("/hi"), "good morning");
info!("awaiting report");
let report = reports.wait().next().unwrap().unwrap();
// requests -----------------------
assert_eq!(report.requests.len(), 2);
// first request
let req = &report.requests[0];
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
let res = &req.responses[0];
// response latencies should always have a length equal to the number
// of latency buckets in the latency histogram.
assert_eq!(
res.response_latency_counts.len(),
report.histogram_bucket_bounds_tenth_ms.len()
);
for (idx, bucket) in res.response_latency_counts.iter().enumerate() {
// 500 ms of extra latency should put us in the 500-1000
// millisecond bucket (the 15th bucket)
if idx == 15 {
assert_eq!(*bucket, 1, "poorly bucketed latencies: {:?}", res.response_latency_counts);
} else {
assert_eq!(*bucket, 0, "poorly bucketed latencies: {:?}", res.response_latency_counts);
}
}
// second request
let req = &report.requests.get(1).expect("second report");
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
assert_eq!(req.count, 2);
assert_eq!(req.responses.len(), 1);
let res = req.responses.get(0).expect("responses[0]");
// response latencies should always have a length equal to the number
// of latency buckets in the latency histogram.
assert_eq!(
res.response_latency_counts.len(),
report.histogram_bucket_bounds_tenth_ms.len()
);
for (idx, bucket) in res.response_latency_counts.iter().enumerate() {
// 40 ms of extra latency should put us in the 40-50
// millisecond bucket (the 10th bucket)
if idx == 9 {
assert_eq!(*bucket, 2, "poorly bucketed latencies: {:?}", res.response_latency_counts);
} else {
assert_eq!(*bucket, 0, "poorly bucketed latencies: {:?}", res.response_latency_counts);
}
}
}
#[test]
fn telemetry_report_errors_are_ignored() {}
macro_rules! assert_contains {
($scrape:expr, $contains:expr) => {
assert_eventually!($scrape.contains($contains), "metrics scrape:\n{:8}\ndid not contain:\n{:8}", $scrape, $contains)
@ -278,7 +22,6 @@ fn metrics_endpoint_inbound_request_count() {
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -308,7 +51,6 @@ fn metrics_endpoint_outbound_request_count() {
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -388,7 +130,6 @@ mod response_classification {
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -419,7 +160,6 @@ mod response_classification {
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -460,7 +200,6 @@ fn metrics_endpoint_inbound_response_latency() {
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -546,7 +285,6 @@ fn metrics_endpoint_outbound_response_latency() {
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -624,7 +362,6 @@ fn metrics_endpoint_inbound_request_duration() {
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -660,7 +397,6 @@ fn metrics_endpoint_outbound_request_duration() {
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -722,7 +458,6 @@ mod outbound_dst_labels {
let proxy = proxy::new()
.controller(ctrl.run())
.outbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let metrics = client::http1(proxy.metrics, "localhost");
@ -921,7 +656,6 @@ fn metrics_have_no_double_commas() {
.controller(ctrl)
.inbound(inbound_srv)
.outbound(outbound_srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");
@ -942,5 +676,4 @@ fn metrics_have_no_double_commas() {
let scrape = metrics.get("/metrics");
assert!(!scrape.contains(",,"), "outbound metrics had double comma");
}