Move EosCtx to common for Tap and Telemetery (#204)
* Make Eos optional in TapEvent grpc_status not being set in protobuf is the same as being set to zero, which is also status OK Modify TapEvent to include an optional EOS struct Signed-off-by: Andrew Seigner <siggy@buoyant.io> Part of #198 * Add Eos to proto & proxy tap end-of-stream events The proxy now outputs `Eos` instead of `grpc_status` in all end-of-stream tap events. The EOS value is set to `grpc_status_code` when the response ended with a `grpc_status` trailer, `http_reset_code` when the response ended with a reset, and no `Eos` when the response ended gracefully without a `grpc_status` trailer. This PR updates the proxy. The proto and controller changes are in PR #204. Part of #198. Closes #202 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
1e9ff8be03
commit
0008002236
|
@ -6,10 +6,11 @@ use std::{fmt, hash};
|
|||
use std::sync::Arc;
|
||||
|
||||
use http;
|
||||
use h2;
|
||||
|
||||
use convert::*;
|
||||
use ctx;
|
||||
use telemetry::Event;
|
||||
use telemetry::{event, Event};
|
||||
|
||||
// re-export proxy here since we dont care about the other dirs
|
||||
pub use self::proxy::*;
|
||||
|
@ -33,35 +34,6 @@ pub mod proxy {
|
|||
}
|
||||
}
|
||||
|
||||
fn pb_response_end(
|
||||
ctx: &Arc<ctx::http::Request>,
|
||||
since_request_init: ::std::time::Duration,
|
||||
since_response_init: Option<::std::time::Duration>,
|
||||
response_bytes: u64,
|
||||
grpc_status: u32,
|
||||
) -> common::TapEvent {
|
||||
use self::common::tap_event;
|
||||
|
||||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
}),
|
||||
since_request_init: Some(pb_duration(&since_request_init)),
|
||||
since_response_init: since_response_init.as_ref().map(pb_duration),
|
||||
response_bytes,
|
||||
grpc_status,
|
||||
};
|
||||
|
||||
common::TapEvent {
|
||||
source: Some((&ctx.server.remote).into()),
|
||||
target: Some((&ctx.client.remote).into()),
|
||||
event: Some(tap_event::Event::Http(tap_event::Http {
|
||||
event: Some(tap_event::http::Event::ResponseEnd(end)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
// TODO: do we want to carry the string if there is one?
|
||||
pub struct InvalidMethod;
|
||||
|
@ -111,6 +83,84 @@ impl Error for UnknownEvent {
|
|||
}
|
||||
}
|
||||
|
||||
impl event::StreamResponseEnd {
|
||||
fn to_tap_event(&self, ctx: &Arc<ctx::http::Request>) -> common::TapEvent {
|
||||
use self::common::{tap_event, Eos};
|
||||
|
||||
let eos = self.grpc_status
|
||||
.map(Eos::from_grpc_status)
|
||||
;
|
||||
|
||||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
}),
|
||||
since_request_init: Some(pb_duration(&self.since_request_open)),
|
||||
since_response_init: Some(pb_duration(&self.since_response_open)),
|
||||
response_bytes: self.bytes_sent,
|
||||
eos,
|
||||
};
|
||||
|
||||
common::TapEvent {
|
||||
source: Some((&ctx.server.remote).into()),
|
||||
target: Some((&ctx.client.remote).into()),
|
||||
event: Some(tap_event::Event::Http(tap_event::Http {
|
||||
event: Some(tap_event::http::Event::ResponseEnd(end)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl event::StreamResponseFail {
|
||||
fn to_tap_event(&self, ctx: &Arc<ctx::http::Request>) -> common::TapEvent {
|
||||
use self::common::tap_event;
|
||||
|
||||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
}),
|
||||
since_request_init: Some(pb_duration(&self.since_request_open)),
|
||||
since_response_init: Some(pb_duration(&self.since_response_open)),
|
||||
response_bytes: self.bytes_sent,
|
||||
eos: Some(self.error.into()),
|
||||
};
|
||||
|
||||
common::TapEvent {
|
||||
source: Some((&ctx.server.remote).into()),
|
||||
target: Some((&ctx.client.remote).into()),
|
||||
event: Some(tap_event::Event::Http(tap_event::Http {
|
||||
event: Some(tap_event::http::Event::ResponseEnd(end)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl event::StreamRequestFail {
|
||||
fn to_tap_event(&self, ctx: &Arc<ctx::http::Request>) -> common::TapEvent {
|
||||
use self::common::tap_event;
|
||||
|
||||
let end = tap_event::http::ResponseEnd {
|
||||
id: Some(tap_event::http::StreamId {
|
||||
base: 0, // TODO FIXME
|
||||
stream: ctx.id as u64,
|
||||
}),
|
||||
since_request_init: Some(pb_duration(&self.since_request_open)),
|
||||
since_response_init: None,
|
||||
response_bytes: 0,
|
||||
eos: Some(self.error.into()),
|
||||
};
|
||||
|
||||
common::TapEvent {
|
||||
source: Some((&ctx.server.remote).into()),
|
||||
target: Some((&ctx.client.remote).into()),
|
||||
event: Some(tap_event::Event::Http(tap_event::Http {
|
||||
event: Some(tap_event::http::Event::ResponseEnd(end)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TryFrom<&'a Event> for common::TapEvent {
|
||||
type Err = UnknownEvent;
|
||||
|
@ -165,24 +215,16 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
|
|||
}
|
||||
|
||||
Event::StreamRequestFail(ref ctx, ref fail) => {
|
||||
pb_response_end(ctx, fail.since_request_open, None, 0, 0)
|
||||
fail.to_tap_event(&ctx)
|
||||
}
|
||||
|
||||
Event::StreamResponseEnd(ref ctx, ref end) => pb_response_end(
|
||||
&ctx.request,
|
||||
end.since_request_open,
|
||||
Some(end.since_response_open),
|
||||
end.bytes_sent,
|
||||
end.grpc_status.unwrap_or(0),
|
||||
),
|
||||
Event::StreamResponseEnd(ref ctx, ref end) => {
|
||||
end.to_tap_event(&ctx.request)
|
||||
}
|
||||
|
||||
Event::StreamResponseFail(ref ctx, ref fail) => pb_response_end(
|
||||
&ctx.request,
|
||||
fail.since_request_open,
|
||||
Some(fail.since_response_open),
|
||||
fail.bytes_sent,
|
||||
0,
|
||||
),
|
||||
Event::StreamResponseFail(ref ctx, ref fail) => {
|
||||
fail.to_tap_event(&ctx.request)
|
||||
}
|
||||
|
||||
_ => return Err(UnknownEvent),
|
||||
};
|
||||
|
@ -290,6 +332,22 @@ impl<'a> From<&'a str> for common::Scheme {
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl common::Eos =====
|
||||
|
||||
impl From<h2::Reason> for common::Eos {
|
||||
fn from(reason: h2::Reason) -> Self {
|
||||
let end = common::eos::End::ResetErrorCode(reason.into());
|
||||
common::Eos { end: Some(end) }
|
||||
}
|
||||
}
|
||||
|
||||
impl common::Eos {
|
||||
fn from_grpc_status(code: u32) -> Self {
|
||||
let end = common::eos::End::GrpcStatusCode(code);
|
||||
common::Eos { end: Some(end) }
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl common::IpAddress =====
|
||||
|
||||
impl<T> From<T> for common::IpAddress
|
||||
|
@ -317,6 +375,7 @@ impl From<::std::net::IpAddr> for common::IpAddress {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl common::IPv6 =====
|
||||
|
||||
impl From<[u8; 16]> for common::IPv6 {
|
||||
|
|
|
@ -6,10 +6,14 @@ use std::time::Duration;
|
|||
use http;
|
||||
use ordermap::OrderMap;
|
||||
|
||||
use control::pb::common::{HttpMethod, TcpAddress, Protocol};
|
||||
use control::pb::common::{
|
||||
HttpMethod,
|
||||
TcpAddress,
|
||||
Protocol,
|
||||
};
|
||||
use control::pb::proxy::telemetry::{
|
||||
eos_ctx,
|
||||
ClientTransport,
|
||||
eos_ctx,
|
||||
EosCtx,
|
||||
EosScope,
|
||||
Latency as PbLatency,
|
||||
|
|
|
@ -8,7 +8,7 @@ use futures_mpsc_lossy;
|
|||
use ctx;
|
||||
|
||||
mod control;
|
||||
mod event;
|
||||
pub mod event;
|
||||
mod metrics;
|
||||
pub mod sensor;
|
||||
pub mod tap;
|
||||
|
|
Loading…
Reference in New Issue