From 8b7baf62c3fc28ac914626b3a40deb67a7c923bc Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 25 Jan 2018 16:36:16 -0800 Subject: [PATCH] proxy: fix h1 streams to trigger response end events Response End events were only triggered after polling the trailers of a response, but when the Response is given to a hyper h1 server, it doesn't know about trailers, so they were never polled! The fix is that the `BodyStream` glue will now poll the wrapped body for trailers after it sees the end of the data, before telling hyper the stream is over. This ensures a ResponseEnd event is emitted. Includes a proxy telemetry test over h1 connections. --- proxy/src/transparency/client.rs | 2 +- proxy/src/transparency/glue.rs | 54 ++++++++++++++++++++++++++------ proxy/tests/telemetry.rs | 46 +++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 10 deletions(-) diff --git a/proxy/src/transparency/client.rs b/proxy/src/transparency/client.rs index 7bc5f7507..1bb73120b 100644 --- a/proxy/src/transparency/client.rs +++ b/proxy/src/transparency/client.rs @@ -166,7 +166,7 @@ where match self.inner { ClientServiceInner::Http1(ref h1) => { let is_body_empty = req.body().is_end_stream(); - let mut req = hyper::Request::from(req.map(BodyStream)); + let mut req = hyper::Request::from(req.map(BodyStream::new)); if is_body_empty { req.headers_mut().set(hyper::header::ContentLength(0)); } diff --git a/proxy/src/transparency/glue.rs b/proxy/src/transparency/glue.rs index 714cfcaf5..8834b9d87 100644 --- a/proxy/src/transparency/glue.rs +++ b/proxy/src/transparency/glue.rs @@ -3,7 +3,7 @@ use std::fmt; use std::io; use std::sync::Arc; -use bytes::Bytes; +use bytes::{Bytes, IntoBuf}; use futures::{future, Async, Future, Poll, Stream}; use futures::future::Either; use h2; @@ -25,7 +25,10 @@ pub enum HttpBody { /// Glue for `tower_h2::Body`s to be used in hyper. #[derive(Debug)] -pub(super) struct BodyStream(pub(super) B); +pub(super) struct BodyStream { + body: B, + poll_trailers: bool, +} /// Glue for the `Data` part of a `tower_h2::Body` to be used as an `AsRef` in `BodyStream`. #[derive(Debug)] @@ -116,6 +119,16 @@ impl Default for HttpBody { // ===== impl BodyStream ===== +impl BodyStream { + /// Wrap a `tower_h2::Body` into a `Stream` hyper can understand. + pub fn new(body: B) -> Self { + BodyStream { + body, + poll_trailers: false, + } + } +} + impl Stream for BodyStream where B: tower_h2::Body, @@ -124,12 +137,35 @@ where type Error = hyper::Error; fn poll(&mut self) -> Poll, Self::Error> { - self.0.poll_data() - .map(|async| async.map(|opt| opt.map(|buf| BufAsRef(::bytes::IntoBuf::into_buf(buf))))) - .map_err(|e| { - trace!("h2 body error: {:?}", e); - hyper::Error::Io(io::ErrorKind::Other.into()) - }) + loop { + if self.poll_trailers { + return match self.body.poll_trailers() { + // we don't care about actual trailers, just that the poll + // was ready. now we can tell hyper the stream is done + Ok(Async::Ready(_)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + trace!("h2 trailers error: {:?}", e); + Err(hyper::Error::Io(io::ErrorKind::Other.into())) + } + }; + } else { + match self.body.poll_data() { + Ok(Async::Ready(Some(buf))) => return Ok(Async::Ready(Some(BufAsRef(buf.into_buf())))), + Ok(Async::Ready(None)) => { + // when the data is empty, even though hyper can't use the trailers, + // we need to poll for them, to allow the stream to mark itself as + // completed successfully. + self.poll_trailers = true; + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + trace!("h2 body error: {:?}", e); + return Err(hyper::Error::Io(io::ErrorKind::Other.into())) + } + } + } + } } } @@ -217,7 +253,7 @@ where return Ok(Async::Ready(res)); } h1::strip_connection_headers(res.headers_mut()); - Ok(Async::Ready(res.map(BodyStream).into())) + Ok(Async::Ready(res.map(BodyStream::new).into())) } } diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index 8636912c2..cd865a14c 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -53,5 +53,51 @@ fn inbound_sends_telemetry() { assert_eq!(stream.frames_sent, 1); } +#[test] +fn http1_inbound_sends_telemetry() { + let _ = env_logger::init(); + + info!("running test server"); + let srv = server::http1().route("/hey", "hello").run(); + + let mut ctrl = controller::new(); + let reports = ctrl.reports(); + let proxy = proxy::new() + .controller(ctrl.run()) + .inbound(srv) + .metrics_flush_interval(Duration::from_millis(500)) + .run(); + let client = client::http1(proxy.inbound, "test.conduit.local"); + + info!("client.get(/hey)"); + assert_eq!(client.get("/hey"), "hello"); + + info!("awaiting report"); + let report = reports.wait().next().unwrap().unwrap(); + // proxy inbound + assert_eq!(report.proxy, 0); + // requests + assert_eq!(report.requests.len(), 1); + let req = &report.requests[0]; + assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local"); + assert_eq!(req.ctx.as_ref().unwrap().path, "/hey"); + //assert_eq!(req.ctx.as_ref().unwrap().method, GET); + assert_eq!(req.count, 1); + assert_eq!(req.responses.len(), 1); + // responses + let res = &req.responses[0]; + assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200); + assert_eq!(res.response_latencies.len(), 1); + assert_eq!(res.ends.len(), 1); + // ends + let ends = &res.ends[0]; + assert_eq!(ends.streams.len(), 1); + // streams + let stream = &ends.streams[0]; + assert_eq!(stream.bytes_sent, 5); + assert_eq!(stream.frames_sent, 1); +} + #[test] fn telemetry_report_errors_are_ignored() {} +