proxy: fix h1 streams to trigger response end events

Response End events were only triggered after polling the trailers of
a response, but when the Response is given to a hyper h1 server, it
doesn't know about trailers, so they were never polled!

The fix is that the `BodyStream` glue will now poll the wrapped body for
trailers after it sees the end of the data, before telling hyper the
stream is over. This ensures a ResponseEnd event is emitted.

Includes a proxy telemetry test over h1 connections.
This commit is contained in:
Sean McArthur 2018-01-25 16:36:16 -08:00 committed by GitHub
parent aa17e37ab5
commit b861318e86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 10 deletions

View File

@ -166,7 +166,7 @@ where
match self.inner {
ClientServiceInner::Http1(ref h1) => {
let is_body_empty = req.body().is_end_stream();
let mut req = hyper::Request::from(req.map(BodyStream));
let mut req = hyper::Request::from(req.map(BodyStream::new));
if is_body_empty {
req.headers_mut().set(hyper::header::ContentLength(0));
}

View File

@ -3,7 +3,7 @@ use std::fmt;
use std::io;
use std::sync::Arc;
use bytes::Bytes;
use bytes::{Bytes, IntoBuf};
use futures::{future, Async, Future, Poll, Stream};
use futures::future::Either;
use h2;
@ -25,7 +25,10 @@ pub enum HttpBody {
/// Glue for `tower_h2::Body`s to be used in hyper.
#[derive(Debug)]
pub(super) struct BodyStream<B>(pub(super) B);
pub(super) struct BodyStream<B> {
body: B,
poll_trailers: bool,
}
/// Glue for the `Data` part of a `tower_h2::Body` to be used as an `AsRef` in `BodyStream`.
#[derive(Debug)]
@ -116,6 +119,16 @@ impl Default for HttpBody {
// ===== impl BodyStream =====
impl<B> BodyStream<B> {
/// Wrap a `tower_h2::Body` into a `Stream` hyper can understand.
pub fn new(body: B) -> Self {
BodyStream {
body,
poll_trailers: false,
}
}
}
impl<B> Stream for BodyStream<B>
where
B: tower_h2::Body,
@ -124,12 +137,35 @@ where
type Error = hyper::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll_data()
.map(|async| async.map(|opt| opt.map(|buf| BufAsRef(::bytes::IntoBuf::into_buf(buf)))))
.map_err(|e| {
trace!("h2 body error: {:?}", e);
hyper::Error::Io(io::ErrorKind::Other.into())
})
loop {
if self.poll_trailers {
return match self.body.poll_trailers() {
// we don't care about actual trailers, just that the poll
// was ready. now we can tell hyper the stream is done
Ok(Async::Ready(_)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
trace!("h2 trailers error: {:?}", e);
Err(hyper::Error::Io(io::ErrorKind::Other.into()))
}
};
} else {
match self.body.poll_data() {
Ok(Async::Ready(Some(buf))) => return Ok(Async::Ready(Some(BufAsRef(buf.into_buf())))),
Ok(Async::Ready(None)) => {
// when the data is empty, even though hyper can't use the trailers,
// we need to poll for them, to allow the stream to mark itself as
// completed successfully.
self.poll_trailers = true;
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("h2 body error: {:?}", e);
return Err(hyper::Error::Io(io::ErrorKind::Other.into()))
}
}
}
}
}
}
@ -217,7 +253,7 @@ where
return Ok(Async::Ready(res));
}
h1::strip_connection_headers(res.headers_mut());
Ok(Async::Ready(res.map(BodyStream).into()))
Ok(Async::Ready(res.map(BodyStream::new).into()))
}
}

View File

@ -53,5 +53,51 @@ fn inbound_sends_telemetry() {
assert_eq!(stream.frames_sent, 1);
}
#[test]
fn http1_inbound_sends_telemetry() {
let _ = env_logger::init();
info!("running test server");
let srv = server::http1().route("/hey", "hello").run();
let mut ctrl = controller::new();
let reports = ctrl.reports();
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::http1(proxy.inbound, "test.conduit.local");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
info!("awaiting report");
let report = reports.wait().next().unwrap().unwrap();
// proxy inbound
assert_eq!(report.proxy, 0);
// requests
assert_eq!(report.requests.len(), 1);
let req = &report.requests[0];
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
assert_eq!(req.ctx.as_ref().unwrap().path, "/hey");
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
assert_eq!(req.count, 1);
assert_eq!(req.responses.len(), 1);
// responses
let res = &req.responses[0];
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
assert_eq!(res.response_latencies.len(), 1);
assert_eq!(res.ends.len(), 1);
// ends
let ends = &res.ends[0];
assert_eq!(ends.streams.len(), 1);
// streams
let stream = &ends.streams[0];
assert_eq!(stream.bytes_sent, 5);
assert_eq!(stream.frames_sent, 1);
}
#[test]
fn telemetry_report_errors_are_ignored() {}