diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index 60b4216a2..e13b703fd 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -6,10 +6,11 @@ use std::{fmt, hash}; use std::sync::Arc; use http; +use h2; use convert::*; use ctx; -use telemetry::Event; +use telemetry::{event, Event}; // re-export proxy here since we dont care about the other dirs pub use self::proxy::*; @@ -33,35 +34,6 @@ pub mod proxy { } } -fn pb_response_end( - ctx: &Arc, - since_request_init: ::std::time::Duration, - since_response_init: Option<::std::time::Duration>, - response_bytes: u64, - grpc_status: u32, -) -> common::TapEvent { - use self::common::tap_event; - - let end = tap_event::http::ResponseEnd { - id: Some(tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_duration(&since_request_init)), - since_response_init: since_response_init.as_ref().map(pb_duration), - response_bytes, - grpc_status, - }; - - common::TapEvent { - source: Some((&ctx.server.remote).into()), - target: Some((&ctx.client.remote).into()), - event: Some(tap_event::Event::Http(tap_event::Http { - event: Some(tap_event::http::Event::ResponseEnd(end)), - })), - } -} - #[derive(Debug, Clone)] // TODO: do we want to carry the string if there is one? pub struct InvalidMethod; @@ -111,6 +83,84 @@ impl Error for UnknownEvent { } } +impl event::StreamResponseEnd { + fn to_tap_event(&self, ctx: &Arc) -> common::TapEvent { + use self::common::{tap_event, Eos}; + + let eos = self.grpc_status + .map(Eos::from_grpc_status) + ; + + let end = tap_event::http::ResponseEnd { + id: Some(tap_event::http::StreamId { + base: 0, // TODO FIXME + stream: ctx.id as u64, + }), + since_request_init: Some(pb_duration(&self.since_request_open)), + since_response_init: Some(pb_duration(&self.since_response_open)), + response_bytes: self.bytes_sent, + eos, + }; + + common::TapEvent { + source: Some((&ctx.server.remote).into()), + target: Some((&ctx.client.remote).into()), + event: Some(tap_event::Event::Http(tap_event::Http { + event: Some(tap_event::http::Event::ResponseEnd(end)), + })), + } + } +} + +impl event::StreamResponseFail { + fn to_tap_event(&self, ctx: &Arc) -> common::TapEvent { + use self::common::tap_event; + + let end = tap_event::http::ResponseEnd { + id: Some(tap_event::http::StreamId { + base: 0, // TODO FIXME + stream: ctx.id as u64, + }), + since_request_init: Some(pb_duration(&self.since_request_open)), + since_response_init: Some(pb_duration(&self.since_response_open)), + response_bytes: self.bytes_sent, + eos: Some(self.error.into()), + }; + + common::TapEvent { + source: Some((&ctx.server.remote).into()), + target: Some((&ctx.client.remote).into()), + event: Some(tap_event::Event::Http(tap_event::Http { + event: Some(tap_event::http::Event::ResponseEnd(end)), + })), + } + } +} + +impl event::StreamRequestFail { + fn to_tap_event(&self, ctx: &Arc) -> common::TapEvent { + use self::common::tap_event; + + let end = tap_event::http::ResponseEnd { + id: Some(tap_event::http::StreamId { + base: 0, // TODO FIXME + stream: ctx.id as u64, + }), + since_request_init: Some(pb_duration(&self.since_request_open)), + since_response_init: None, + response_bytes: 0, + eos: Some(self.error.into()), + }; + + common::TapEvent { + source: Some((&ctx.server.remote).into()), + target: Some((&ctx.client.remote).into()), + event: Some(tap_event::Event::Http(tap_event::Http { + event: Some(tap_event::http::Event::ResponseEnd(end)), + })), + } + } +} impl<'a> TryFrom<&'a Event> for common::TapEvent { type Err = UnknownEvent; @@ -165,24 +215,16 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { } Event::StreamRequestFail(ref ctx, ref fail) => { - pb_response_end(ctx, fail.since_request_open, None, 0, 0) + fail.to_tap_event(&ctx) } - Event::StreamResponseEnd(ref ctx, ref end) => pb_response_end( - &ctx.request, - end.since_request_open, - Some(end.since_response_open), - end.bytes_sent, - end.grpc_status.unwrap_or(0), - ), + Event::StreamResponseEnd(ref ctx, ref end) => { + end.to_tap_event(&ctx.request) + } - Event::StreamResponseFail(ref ctx, ref fail) => pb_response_end( - &ctx.request, - fail.since_request_open, - Some(fail.since_response_open), - fail.bytes_sent, - 0, - ), + Event::StreamResponseFail(ref ctx, ref fail) => { + fail.to_tap_event(&ctx.request) + } _ => return Err(UnknownEvent), }; @@ -290,6 +332,22 @@ impl<'a> From<&'a str> for common::Scheme { } } +// ===== impl common::Eos ===== + +impl From for common::Eos { + fn from(reason: h2::Reason) -> Self { + let end = common::eos::End::ResetErrorCode(reason.into()); + common::Eos { end: Some(end) } + } +} + +impl common::Eos { + fn from_grpc_status(code: u32) -> Self { + let end = common::eos::End::GrpcStatusCode(code); + common::Eos { end: Some(end) } + } +} + // ===== impl common::IpAddress ===== impl From for common::IpAddress @@ -317,6 +375,7 @@ impl From<::std::net::IpAddr> for common::IpAddress { } } + // ===== impl common::IPv6 ===== impl From<[u8; 16]> for common::IPv6 { diff --git a/proxy/src/telemetry/metrics.rs b/proxy/src/telemetry/metrics.rs index 30ec8c88e..e7e064dd9 100644 --- a/proxy/src/telemetry/metrics.rs +++ b/proxy/src/telemetry/metrics.rs @@ -6,10 +6,14 @@ use std::time::Duration; use http; use ordermap::OrderMap; -use control::pb::common::{HttpMethod, TcpAddress, Protocol}; +use control::pb::common::{ + HttpMethod, + TcpAddress, + Protocol, +}; use control::pb::proxy::telemetry::{ - eos_ctx, ClientTransport, + eos_ctx, EosCtx, EosScope, Latency as PbLatency, diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index b9002421b..671a81944 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -8,7 +8,7 @@ use futures_mpsc_lossy; use ctx; mod control; -mod event; +pub mod event; mod metrics; pub mod sensor; pub mod tap;