add Route retries to Service Profiles

Signed-off-by: Sean McArthur <sean@buoyant.io>
This commit is contained in:
Sean McArthur 2018-12-18 13:45:31 -08:00 committed by Sean McArthur
parent 6649630db7
commit 2f2050537d
19 changed files with 933 additions and 30 deletions

View File

@ -4,6 +4,8 @@ sudo: false
language: rust
cache: cargo
env:
- RUSTFLAGS="-C debuginfo=0"
branches:
only:

View File

@ -571,6 +571,7 @@ dependencies = [
"tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)",
"tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-retry 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-service 0.2.0 (git+https://github.com/tower-rs/tower)",
"tower-util 0.1.0 (git+https://github.com/tower-rs/tower)",
"trust-dns-resolver 0.10.0 (git+https://github.com/bluejekyll/trust-dns?rev=c017c114)",
@ -1421,6 +1422,16 @@ dependencies = [
"tower-util 0.1.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
name = "tower-retry"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#075ffb372556d05a7db02ce49db34cfb22850dba"
dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.2.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
name = "tower-service"
version = "0.2.0"
@ -1790,6 +1801,7 @@ dependencies = [
"checksum tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)" = "<none>"
"checksum tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-retry 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-service 0.2.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum trust-dns-proto 0.5.0 (git+https://github.com/bluejekyll/trust-dns?rev=c017c114)" = "<none>"

View File

@ -65,6 +65,7 @@ tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
tower-in-flight-limit = { git = "https://github.com/tower-rs/tower" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-retry = { git = "https://github.com/tower-rs/tower" }
tower-service = { git = "https://github.com/tower-rs/tower" }
tower-util = { git = "https://github.com/tower-rs/tower" }
tower-http = { git = "https://github.com/tower-rs/tower-http" }

View File

@ -188,6 +188,19 @@ fn grpc_class(headers: &http::HeaderMap) -> Option<Class> {
})
}
// === impl Class ===
impl Class {
pub(super) fn is_failure(&self) -> bool {
match self {
Class::Default(SuccessOrFailure::Failure) |
Class::Grpc(SuccessOrFailure::Failure, _) |
Class::Stream(SuccessOrFailure::Failure, _) => true,
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use http::{HeaderMap, Response, StatusCode};

View File

@ -2,8 +2,13 @@ use http;
use indexmap::IndexMap;
use std::fmt;
use std::sync::Arc;
use tower_retry::budget::Budget;
use proxy::http::{metrics::classify::CanClassify, profiles};
use proxy::http::{
metrics::classify::{CanClassify, Classify, ClassifyResponse, ClassifyEos},
profiles,
retry,
};
use {Addr, NameAddr};
use super::classify;
@ -20,6 +25,12 @@ pub struct Route {
pub route: profiles::Route,
}
#[derive(Clone, Debug)]
pub struct Retry {
budget: Arc<Budget>,
response_classes: profiles::ResponseClasses,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DstAddr {
addr: Addr,
@ -36,6 +47,51 @@ impl CanClassify for Route {
}
}
impl retry::CanRetry for Route {
type Retry = Retry;
fn can_retry(&self) -> Option<Self::Retry> {
self
.route
.retries()
.map(|retries| Retry {
budget: retries.budget().clone(),
response_classes: self.route.response_classes().clone(),
})
}
}
// === impl Retry ===
impl retry::Retry for Retry {
fn retry<B1, B2>(&self, req: &http::Request<B1>, res: &http::Response<B2>) -> Result<(), retry::NoRetry> {
let class = classify::Request::from(self.response_classes.clone())
.classify(req)
.start(res)
.eos(None);
if class.is_failure() {
return self
.budget
.withdraw()
.map_err(|_overdrawn| retry::NoRetry::Budget);
}
self.budget.deposit();
Err(retry::NoRetry::Success)
}
fn clone_request<B: retry::TryClone>(&self, req: &http::Request<B>) -> Option<http::Request<B>> {
retry::TryClone::try_clone(req)
.map(|mut clone| {
if let Some(ext) = req.extensions().get::<classify::Response>() {
clone.extensions_mut().insert(ext.clone());
}
clone
})
}
}
// === impl DstAddr ===
impl AsRef<Addr> for DstAddr {

View File

@ -210,12 +210,18 @@ where
(m, r.with_prefix("route"))
};
let (retry_http_metrics, retry_http_report) = {
let (m, r) = http_metrics::new::<RouteLabels, Class>(config.metrics_retain_idle);
(m, r.with_prefix("route_actual"))
};
let (transport_metrics, transport_report) = transport::metrics::new();
let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new();
let report = endpoint_http_report
.and_then(route_http_report)
.and_then(retry_http_report)
.and_then(transport_report)
.and_then(tls_config_report)
.and_then(ctl_http_report)
@ -293,7 +299,7 @@ where
use super::outbound::{discovery::Resolve, orig_proto_upgrade, Endpoint};
use proxy::{
canonicalize,
http::{balance, header_from_target, metrics},
http::{balance, header_from_target, metrics, retry},
resolve,
};
@ -344,6 +350,8 @@ where
// implementations can use the route-specific configuration.
let dst_route_layer = phantom_data::layer()
.push(insert_target::layer())
.push(metrics::layer::<_, classify::Response>(retry_http_metrics.clone()))
.push(retry::layer(retry_http_metrics))
.push(metrics::layer::<_, classify::Response>(route_http_metrics))
.push(classify::layer());

View File

@ -4,11 +4,13 @@ use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use http;
use regex::Regex;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tokio::executor::{DefaultExecutor, Executor};
use tokio_timer::{clock, Delay};
use tower_grpc::{self as grpc, Body, BoxBody};
use tower_http::HttpService;
use tower_retry::budget::Budget;
use api::destination as api;
use never::Never;
@ -129,8 +131,14 @@ where
Ok(Async::Ready(None)) => return StreamState::RecvDone.into(),
Ok(Async::Ready(Some(profile))) => {
debug!("profile received: {:?}", profile);
let rs = profile.routes.into_iter().filter_map(convert_route);
match tx.start_send(rs.collect()) {
let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
let routes = profile
.routes
.into_iter()
.filter_map(move |orig| {
convert_route(orig, retry_budget.as_ref())
});
match tx.start_send(routes.collect()) {
Ok(AsyncSink::Ready) => {} // continue
Ok(AsyncSink::NotReady(_)) => {
info!("dropping profile update due to a full buffer");
@ -205,17 +213,32 @@ where
}
}
fn convert_route(orig: api::Route) -> Option<(profiles::RequestMatch, profiles::Route)> {
fn convert_route(orig: api::Route, retry_budget: Option<&Arc<Budget>>) -> Option<(profiles::RequestMatch, profiles::Route)> {
let req_match = orig.condition.and_then(convert_req_match)?;
let rsp_classes = orig
.response_classes
.into_iter()
.filter_map(convert_rsp_class)
.collect();
let route = profiles::Route::new(orig.metrics_labels.into_iter(), rsp_classes);
let mut route = profiles::Route::new(orig.metrics_labels.into_iter(), rsp_classes);
if orig.is_retryable {
set_route_retry(&mut route, retry_budget);
}
Some((req_match, route))
}
fn set_route_retry(route: &mut profiles::Route, retry_budget: Option<&Arc<Budget>>) {
let budget = match retry_budget {
Some(budget) => budget.clone(),
None => {
warn!("retry_budget is missing: {:?}", route);
return;
},
};
route.set_retries(budget);
}
fn convert_req_match(orig: api::RequestMatch) -> Option<profiles::RequestMatch> {
let m = match orig.match_? {
api::request_match::Match::All(ms) => {
@ -294,3 +317,57 @@ fn convert_rsp_match(orig: api::ResponseMatch) -> Option<profiles::ResponseMatch
Some(m)
}
fn convert_retry_budget(orig: api::RetryBudget) -> Option<Arc<Budget>> {
let min_retries = if orig.min_retries_per_second <= ::std::i32::MAX as u32 {
orig.min_retries_per_second
} else {
warn!("retry_budget min_retries_per_second overflow: {:?}", orig.min_retries_per_second);
return None;
};
let retry_ratio = orig.retry_ratio;
if retry_ratio > 1000.0 || retry_ratio < 0.0 {
warn!("retry_budget retry_ratio invalid: {:?}", retry_ratio);
return None;
}
let ttl = match orig.ttl?.into() {
Ok(dur) => {
if dur > Duration::from_secs(60) || dur < Duration::from_secs(1) {
warn!("retry_budget ttl invalid: {:?}", dur);
return None;
}
dur
},
Err(_) => return None,
};
Some(Arc::new(Budget::new(ttl, min_retries, retry_ratio)))
}
#[cfg(test)]
mod tests {
use quickcheck::*;
use super::*;
quickcheck! {
fn retry_budget_from_proto(
min_retries_per_second: u32,
retry_ratio: f32,
seconds: i64,
nanos: i32
) -> bool {
let proto = api::RetryBudget {
min_retries_per_second,
retry_ratio,
ttl: Some(::prost_types::Duration {
seconds,
nanos,
}),
};
convert_retry_budget(proto);
// simply not panicking is good enough
true
}
}
}

View File

@ -38,6 +38,7 @@ extern crate tokio;
extern crate tokio_timer;
extern crate tower_grpc;
extern crate tower_http;
extern crate tower_retry;
extern crate tower_util;
extern crate trust_dns_resolver;
extern crate try_lock;

View File

@ -118,6 +118,16 @@ impl Default for HttpBody {
}
}
impl super::retry::TryClone for HttpBody {
fn try_clone(&self) -> Option<Self> {
if self.is_end_stream() {
Some(HttpBody::default())
} else {
None
}
}
}
impl Drop for HttpBody {
fn drop(&mut self) {
// If an HTTP/1 upgrade was wanted, send the upgrade future.

View File

@ -29,16 +29,26 @@ where
T: Hash + Eq,
C: Hash + Eq,
{
by_target: IndexMap<T, Arc<Mutex<Metrics<C>>>>,
by_target: IndexMap<T, Arc<Mutex<RequestMetrics<C>>>>,
}
pub trait Scoped<T> {
type Scope: Stats;
fn scoped(&self, index: T) -> Self::Scope;
}
pub trait Stats {
fn incr_retry_skipped_budget(&self);
}
#[derive(Debug)]
struct Metrics<C>
pub struct RequestMetrics<C>
where
C: Hash + Eq,
{
last_update: Instant,
total: Counter,
by_retry_skipped: IndexMap<RetrySkipped, Counter>,
by_status: IndexMap<http::StatusCode, StatusMetrics<C>>,
}
@ -56,6 +66,11 @@ pub struct ClassMetrics {
total: Counter,
}
#[derive(Debug, PartialEq, Eq, Hash)]
enum RetrySkipped {
Budget,
}
impl<T, C> Default for Registry<T, C>
where
T: Hash + Eq,
@ -74,7 +89,7 @@ where
C: Hash + Eq,
{
/// Retains metrics for all targets that (1) no longer have an active
/// reference to the `Metrics` structure and (2) have not been updated since `epoch`.
/// reference to the `RequestMetrics` structure and (2) have not been updated since `epoch`.
fn retain_since(&mut self, epoch: Instant) {
self.by_target.retain(|_, m| {
Arc::strong_count(&m) > 1 || m.lock().map(|m| m.last_update >= epoch).unwrap_or(false)
@ -82,7 +97,38 @@ where
}
}
impl<C> Default for Metrics<C>
impl<T, C> Scoped<T> for Arc<Mutex<Registry<T, C>>>
where
T: Hash + Eq,
C: Hash + Eq,
{
type Scope = Arc<Mutex<RequestMetrics<C>>>;
fn scoped(&self, target: T) -> Self::Scope {
self
.lock()
.expect("metrics Registry lock")
.by_target
.entry(target)
.or_insert_with(|| Arc::new(Mutex::new(RequestMetrics::default())))
.clone()
}
}
impl<C> RequestMetrics<C>
where
C: Hash + Eq,
{
fn incr_retry_skipped(&mut self, reason: RetrySkipped) {
self
.by_retry_skipped
.entry(reason)
.or_insert_with(Counter::default)
.incr();
}
}
impl<C> Default for RequestMetrics<C>
where
C: Hash + Eq,
{
@ -90,11 +136,24 @@ where
Self {
last_update: clock::now(),
total: Counter::default(),
by_retry_skipped: IndexMap::default(),
by_status: IndexMap::default(),
}
}
}
impl<C> Stats for Arc<Mutex<RequestMetrics<C>>>
where
C: Hash + Eq,
{
fn incr_retry_skipped_budget(&self) {
if let Ok(mut metrics) = self.lock() {
metrics.last_update = clock::now();
metrics.incr_retry_skipped(RetrySkipped::Budget);
}
}
}
impl<C> Default for StatusMetrics<C>
where
C: Hash + Eq,

View File

@ -7,7 +7,7 @@ use tokio_timer::clock;
use metrics::{latency, Counter, FmtLabels, FmtMetric, FmtMetrics, Histogram, Metric};
use super::{ClassMetrics, Metrics, Registry, StatusMetrics};
use super::{ClassMetrics, RequestMetrics, Registry, RetrySkipped, StatusMetrics};
/// Reports HTTP metrics for prometheus.
#[derive(Clone, Debug)]
@ -28,6 +28,7 @@ struct Scope {
request_total_key: String,
response_total_key: String,
response_latency_ms_key: String,
retry_skipped_total_key: String,
}
// ===== impl Report =====
@ -89,6 +90,9 @@ where
self.scope.response_total().fmt_help(f)?;
registry.fmt_by_class(f, self.scope.response_total(), |s| &s.total)?;
self.scope.retry_skipped_total().fmt_help(f)?;
registry.fmt_by_retry(f, self.scope.retry_skipped_total())?;
Ok(())
}
}
@ -106,7 +110,7 @@ where
) -> fmt::Result
where
M: FmtMetric,
F: Fn(&Metrics<C>) -> &M,
F: Fn(&RequestMetrics<C>) -> &M,
{
for (tgt, tm) in &self.by_target {
if let Ok(m) = tm.lock() {
@ -117,6 +121,26 @@ where
Ok(())
}
fn fmt_by_retry<M>(
&self,
f: &mut fmt::Formatter,
metric: Metric<M>,
) -> fmt::Result
where
M: FmtMetric,
{
for (tgt, tm) in &self.by_target {
if let Ok(tm) = tm.lock() {
for (retry, m) in &tm.by_retry_skipped {
let labels = (tgt, retry);
m.fmt_metric_labeled(f, metric.name, labels)?;
}
}
}
Ok(())
}
fn fmt_by_status<M, F>(
&self,
f: &mut fmt::Formatter,
@ -172,6 +196,7 @@ impl Default for Scope {
request_total_key: "request_total".to_owned(),
response_total_key: "response_total".to_owned(),
response_latency_ms_key: "response_latency_ms".to_owned(),
retry_skipped_total_key: "retry_skipped_total".to_owned(),
}
}
}
@ -186,6 +211,7 @@ impl Scope {
request_total_key: format!("{}_request_total", prefix),
response_total_key: format!("{}_response_total", prefix),
response_latency_ms_key: format!("{}_response_latency_ms", prefix),
retry_skipped_total_key: format!("{}_retry_skipped_total", prefix),
}
}
@ -201,6 +227,10 @@ impl Scope {
Metric::new(&self.response_latency_ms_key, &Self::RESPONSE_LATENCY_MS_HELP)
}
fn retry_skipped_total(&self) -> Metric<Counter> {
Metric::new(&self.retry_skipped_total_key, &Self::RETRY_SKIPPED_TOTAL_HELP)
}
const REQUEST_TOTAL_HELP: &'static str = "Total count of HTTP requests.";
const RESPONSE_TOTAL_HELP: &'static str = "Total count of HTTP responses.";
@ -208,6 +238,8 @@ impl Scope {
const RESPONSE_LATENCY_MS_HELP: &'static str =
"Elapsed times between a request's headers being received \
and its response stream completing";
const RETRY_SKIPPED_TOTAL_HELP: &'static str = "Total count of retryable HTTP responses that were not retried.";
}
impl FmtLabels for Status {
@ -215,3 +247,11 @@ impl FmtLabels for Status {
write!(f, "status_code=\"{}\"", self.0.as_u16())
}
}
impl FmtLabels for RetrySkipped {
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "skipped=\"{}\"", match self {
RetrySkipped::Budget => "budget",
})
}
}

View File

@ -11,7 +11,8 @@ use tokio_timer::clock;
use tower_grpc;
use super::classify::{ClassifyEos, ClassifyResponse};
use super::{ClassMetrics, Metrics, Registry, StatusMetrics};
use super::{ClassMetrics, RequestMetrics, Registry, StatusMetrics};
use super::super::retry::TryClone;
use svc;
/// A stack module that wraps services to record metrics.
@ -46,7 +47,7 @@ where
C: ClassifyResponse<Error = h2::Error> + Clone,
C::Class: Hash + Eq,
{
metrics: Option<Arc<Mutex<Metrics<C::Class>>>>,
metrics: Option<Arc<Mutex<RequestMetrics<C::Class>>>>,
inner: S,
_p: PhantomData<fn() -> C>,
}
@ -57,7 +58,7 @@ where
C::Class: Hash + Eq,
{
classify: Option<C>,
metrics: Option<Arc<Mutex<Metrics<C::Class>>>>,
metrics: Option<Arc<Mutex<RequestMetrics<C::Class>>>>,
stream_open_at: Instant,
inner: F,
}
@ -68,7 +69,7 @@ where
B: Payload,
C: Hash + Eq,
{
metrics: Option<Arc<Mutex<Metrics<C>>>>,
metrics: Option<Arc<Mutex<RequestMetrics<C>>>>,
inner: B,
}
@ -81,7 +82,7 @@ where
{
status: http::StatusCode,
classify: Option<C>,
metrics: Option<Arc<Mutex<Metrics<C::Class>>>>,
metrics: Option<Arc<Mutex<RequestMetrics<C::Class>>>>,
stream_open_at: Instant,
latency_recorded: bool,
inner: B,
@ -173,7 +174,7 @@ where
Ok(mut r) => Some(
r.by_target
.entry(target.clone().into())
.or_insert_with(|| Arc::new(Mutex::new(Metrics::default())))
.or_insert_with(|| Arc::new(Mutex::new(RequestMetrics::default())))
.clone(),
),
Err(_) => None,
@ -214,7 +215,7 @@ where
A: Payload,
B: Payload,
C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq,
C::Class: Hash + Eq + Send + Sync,
{
type Response = http::Response<ResponseBody<B, C::ClassifyEos>>;
type Error = S::Error;
@ -262,7 +263,7 @@ where
F: Future<Item = http::Response<B>>,
B: Payload,
C: ClassifyResponse<Error = h2::Error> + Send + Sync + 'static,
C::Class: Hash + Eq,
C::Class: Hash + Eq + Send + Sync,
{
type Item = http::Response<ResponseBody<B, C::ClassifyEos>>;
type Error = F::Error;
@ -340,6 +341,24 @@ where
}
}
impl<B, C> TryClone for RequestBody<B, C>
where
B: Payload<Error=h2::Error> + TryClone,
C: Eq + Hash
{
fn try_clone(&self) -> Option<Self> {
self
.inner
.try_clone()
.map(|inner| {
RequestBody {
inner,
metrics: self.metrics.clone(),
}
})
}
}
impl<B, C> Default for ResponseBody<B, C>
where
B: Payload + Default,

View File

@ -9,6 +9,7 @@ pub mod metrics;
pub mod normalize_uri;
pub mod orig_proto;
pub mod profiles;
pub mod retry;
pub mod router;
pub mod settings;
pub mod upgrade;

View File

@ -6,6 +6,7 @@ use indexmap::IndexMap;
use regex::Regex;
use std::iter::FromIterator;
use std::sync::Arc;
use tower_retry::budget::Budget;
use never::Never;
@ -40,6 +41,7 @@ pub trait CanGetDestination {
pub struct Route {
labels: Arc<IndexMap<String, String>>,
response_classes: ResponseClasses,
retries: Option<Retries>,
}
#[derive(Clone, Debug)]
@ -70,6 +72,11 @@ pub enum ResponseMatch {
},
}
#[derive(Clone, Debug)]
pub struct Retries {
budget: Arc<Budget>,
}
// === impl Route ===
impl Route {
@ -86,6 +93,7 @@ impl Route {
Self {
labels,
response_classes: response_classes.into(),
retries: None,
}
}
@ -96,6 +104,16 @@ impl Route {
pub fn response_classes(&self) -> &ResponseClasses {
&self.response_classes
}
pub fn retries(&self) -> Option<&Retries> {
self.retries.as_ref()
}
pub fn set_retries(&mut self, budget: Arc<Budget>) {
self.retries = Some(Retries {
budget,
});
}
}
// === impl RequestMatch ===
@ -143,6 +161,14 @@ impl ResponseMatch {
}
}
// === impl Retries ===
impl Retries {
pub fn budget(&self) -> &Arc<Budget> {
&self.budget
}
}
/// A stack module that produces a Service that routes requests through alternate
/// middleware configurations
///

184
src/proxy/http/retry.rs Normal file
View File

@ -0,0 +1,184 @@
use std::marker::PhantomData;
use futures::future;
use http::{Request, Response};
use tower_retry;
use proxy::http::metrics::{Scoped, Stats};
use svc;
pub trait CanRetry {
type Retry: Retry + Clone;
fn can_retry(&self) -> Option<Self::Retry>;
}
pub trait Retry: Sized {
fn retry<B1, B2>(&self, req: &Request<B1>, res: &Response<B2>) -> Result<(), NoRetry>;
fn clone_request<B: TryClone>(&self, req: &Request<B>) -> Option<Request<B>>;
}
pub enum NoRetry {
Success,
Budget,
}
pub trait TryClone: Sized {
fn try_clone(&self) -> Option<Self>;
}
pub struct Layer<S, K, A, B> {
registry: S,
_p: PhantomData<(K, fn(A) -> B)>,
}
pub struct Stack<M, S, K, A, B> {
inner: M,
registry: S,
_p: PhantomData<(K, fn(A) -> B)>,
}
pub type Service<R, Svc, St> = tower_retry::Retry<Policy<R, St>, Svc>;
#[derive(Clone)]
pub struct Policy<R, S>(R, S);
// === impl Layer ===
pub fn layer<S, K, A, B>(registry: S) -> Layer<S, K, A, B> {
Layer {
registry,
_p: PhantomData,
}
}
impl<S: Clone, K, A, B> Clone for Layer<S, K, A, B> {
fn clone(&self) -> Self {
Layer {
registry: self.registry.clone(),
_p: PhantomData,
}
}
}
impl<T, M, S, K, A, B> svc::Layer<T, T, M> for Layer<S, K, A, B>
where
T: CanRetry + Clone,
M: svc::Stack<T>,
M::Value: svc::Service<Request<A>, Response = Response<B>> + Clone,
S: Scoped<K> + Clone,
S::Scope: Clone,
K: From<T>,
A: TryClone,
{
type Value = <Stack<M, S, K, A, B> as svc::Stack<T>>::Value;
type Error = <Stack<M, S, K, A, B> as svc::Stack<T>>::Error;
type Stack = Stack<M, S, K, A, B>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
registry: self.registry.clone(),
_p: PhantomData,
}
}
}
// === impl Stack ===
impl<M: Clone, S: Clone, K, A, B> Clone for Stack<M, S, K, A, B> {
fn clone(&self) -> Self {
Stack {
inner: self.inner.clone(),
registry: self.registry.clone(),
_p: PhantomData,
}
}
}
impl<T, M, S, K, A, B> svc::Stack<T> for Stack<M, S, K, A, B>
where
T: CanRetry + Clone,
M: svc::Stack<T>,
M::Value: svc::Service<Request<A>, Response = Response<B>> + Clone,
S: Scoped<K>,
S::Scope: Clone,
K: From<T>,
A: TryClone,
{
type Value = svc::Either<Service<T::Retry, M::Value, S::Scope>, M::Value>;
type Error = M::Error;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
let inner = self.inner.make(target)?;
if let Some(retries) = target.can_retry() {
trace!("stack is retryable");
let stats = self.registry.scoped(target.clone().into());
Ok(svc::Either::A(tower_retry::Retry::new(Policy(retries, stats), inner)))
} else {
Ok(svc::Either::B(inner))
}
}
}
// === impl Policy ===
impl<R, S, A, B, E> ::tower_retry::Policy<Request<A>, Response<B>, E> for Policy<R, S>
where
R: Retry + Clone,
S: Stats + Clone,
A: TryClone,
{
type Future = future::FutureResult<Self, ()>;
fn retry(&self, req: &Request<A>, result: Result<&Response<B>, &E>) -> Option<Self::Future> {
match result {
Ok(res) => {
match self.0.retry(req, res) {
Ok(()) => {
trace!("retrying request");
Some(future::ok(self.clone()))
},
Err(NoRetry::Budget) => {
self.1.incr_retry_skipped_budget();
None
},
Err(NoRetry::Success) => None,
}
},
Err(_err) => {
trace!("cannot retry transport error");
None
}
}
}
fn clone_request(&self, req: &Request<A>) -> Option<Request<A>> {
if let Some(clone) = self.0.clone_request(req) {
trace!("cloning request");
Some(clone)
} else {
trace!("request could not be cloned");
None
}
}
}
impl<B: TryClone> TryClone for Request<B> {
fn try_clone(&self) -> Option<Self> {
if let Some(body) = self.body().try_clone() {
let mut clone = Request::new(body);
*clone.method_mut() = self.method().clone();
*clone.uri_mut() = self.uri().clone();
*clone.headers_mut() = self.headers().clone();
*clone.version_mut() = self.version();
if let Some(ext) = self.extensions().get::<::proxy::server::Source>() {
clone.extensions_mut().insert(ext.clone());
}
Some(clone)
} else {
None
}
}
}

293
tests/profiles.rs Normal file
View File

@ -0,0 +1,293 @@
#![recursion_limit="128"]
#![deny(warnings)]
mod support;
use self::support::*;
use std::sync::atomic::{AtomicUsize, Ordering};
macro_rules! profile_test {
(routes: [$($route:expr),+], budget: $budget:expr, with_client: $with_client:expr) => {
profile_test! {
routes: [$($route),+],
budget: $budget,
with_client: $with_client,
with_metrics: |_m| {}
}
};
(routes: [$($route:expr),+], budget: $budget:expr, with_client: $with_client:expr, with_metrics: $with_metrics:expr) => {
profile_test! {
http: http1,
routes: [$($route),+],
budget: $budget,
with_client: $with_client,
with_metrics: $with_metrics
}
};
(http: $http:ident, routes: [$($route:expr),+], budget: $budget:expr, with_client: $with_client:expr, with_metrics: $with_metrics:expr) => {
let _ = env_logger_init();
let counter = AtomicUsize::new(0);
let counter2 = AtomicUsize::new(0);
let counter3 = AtomicUsize::new(0);
let host = "profiles.test.svc.cluster.local";
let srv = server::$http()
.route_fn("/load-profile", |_| {
Response::builder()
.status(201)
.body("".into())
.unwrap()
})
.route_fn("/0.5", move |_req| {
if counter.fetch_add(1, Ordering::Relaxed) % 2 == 0 {
Response::builder()
.status(533)
.body("nope".into())
.unwrap()
} else {
Response::builder()
.status(200)
.body("retried".into())
.unwrap()
}
})
.route_fn("/0.5/sleep", move |_req| {
::std::thread::sleep(Duration::from_secs(1));
if counter2.fetch_add(1, Ordering::Relaxed) % 2 == 0 {
Response::builder()
.status(533)
.body("nope".into())
.unwrap()
} else {
Response::builder()
.status(200)
.body("retried".into())
.unwrap()
}
})
.route_fn("/0.5/100KB", move |_req| {
if counter3.fetch_add(1, Ordering::Relaxed) % 2 == 0 {
Response::builder()
.status(533)
.body(vec![b'x'; 1024 * 100].into())
.unwrap()
} else {
Response::builder()
.status(200)
.body("retried".into())
.unwrap()
}
})
.run();
let ctrl = controller::new();
let dst_tx = ctrl.destination_tx(host);
dst_tx.send_addr(srv.addr);
let profile_tx = ctrl.profile_tx(host);
let routes = vec![$(
$route,
),+];
profile_tx.send(controller::profile(routes, $budget));
let ctrl = ctrl.run();
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.run();
let client = client::$http(proxy.outbound, host);
assert_eq!(client.get("/load-profile"), "");
$with_client(client);
let metrics = client::http1(proxy.metrics, "localhost");
$with_metrics(metrics);
}
}
#[test]
fn retry_if_profile_allows() {
profile_test! {
routes: [
controller::route()
.request_any()
// use default classifier
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
with_client: |client: client::Client| {
assert_eq!(client.get("/0.5"), "retried");
}
}
}
#[test]
fn retry_uses_budget() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(1), 0.1, 1)),
with_client: |client: client::Client| {
assert_eq!(client.get("/0.5"), "retried");
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
},
with_metrics: |metrics: client::Client| {
assert_eventually_contains!(
metrics.get("/metrics"),
"route_actual_retry_skipped_total{direction=\"outbound\",dst=\"profiles.test.svc.cluster.local:80\",skipped=\"budget\"} 1"
);
}
}
}
#[test]
fn does_not_retry_if_request_does_not_match() {
profile_test! {
routes: [
controller::route()
.request_path("/wont/match/anything")
.response_failure(..)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
with_client: |client: client::Client| {
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn does_not_retry_if_earlier_response_class_is_success() {
profile_test! {
routes: [
controller::route()
.request_any()
// prevent 533s from being retried
.response_success(533..534)
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
with_client: |client: client::Client| {
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn does_not_retry_if_request_has_body() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
with_client: |client: client::Client| {
let req = client.request_builder("/0.5")
.method("POST")
.body("req has a body".into())
.unwrap();
let res = client.request_body(req);
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn does_not_retry_if_missing_retry_budget() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: None,
with_client: |client: client::Client| {
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn ignores_invalid_retry_budget_ttl() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(1000), 0.1, 1)),
with_client: |client: client::Client| {
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn ignores_invalid_retry_budget_ratio() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 10_000.0, 1)),
with_client: |client: client::Client| {
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn ignores_invalid_retry_budget_negative_ratio() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), -1.0, 1)),
with_client: |client: client::Client| {
let res = client.request(&mut client.request_builder("/0.5"));
assert_eq!(res.status(), 533);
}
}
}
#[test]
fn http2_failures_dont_leak_connection_window() {
profile_test! {
http: http2,
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 1.0, 10)),
with_client: |client: client::Client| {
// Before https://github.com/carllerche/h2/pull/334, this would
// hang since the retried failure would have leaked the 100k window
// capacity, preventing the successful response from being read.
assert_eq!(client.get("/0.5/100KB"), "retried");
},
with_metrics: |_m| {}
}
}

View File

@ -72,10 +72,9 @@ impl Client {
pub fn get(&self, path: &str) -> String {
let mut req = self.request_builder(path);
let res = self.request(req.method("GET"));
assert_eq!(
res.status(),
StatusCode::OK,
"client.get({:?}) expects 200 OK, got \"{}\"",
assert!(
res.status().is_success(),
"client.get({:?}) expects 2xx, got \"{}\"",
path,
res.status(),
);

View File

@ -7,6 +7,7 @@ use support::hyper::body::Payload;
use std::collections::{HashMap, VecDeque};
use std::net::IpAddr;
use std::ops::{Bound, RangeBounds};
use std::sync::{Arc, Mutex};
use linkerd2_proxy_api::destination as pb;
@ -41,6 +42,11 @@ pub struct Listening {
shutdown: Shutdown,
}
#[derive(Clone, Debug, Default)]
pub struct RouteBuilder {
route: pb::Route,
}
impl Controller {
pub fn new() -> Self {
Self::default()
@ -83,9 +89,14 @@ impl Controller {
pub fn profile_tx(&self, dest: &str) -> ProfileSender {
let (tx, rx) = sync::mpsc::unbounded();
let path = if dest.contains(":") {
dest.to_owned()
} else {
format!("{}:80", dest)
};
let dst = pb::GetDestination {
scheme: "k8s".into(),
path: dest.into(),
path,
};
self.expect_profile_calls
.lock()
@ -319,6 +330,96 @@ pub fn destination_exists_with_no_endpoints() -> pb::Update {
}
}
pub fn profile<I>(routes: I, retry_budget: Option<pb::RetryBudget>) -> pb::DestinationProfile
where
I: IntoIterator,
I::Item: Into<pb::Route>,
{
let routes = routes.into_iter().map(Into::into).collect();
pb::DestinationProfile {
routes,
retry_budget,
}
}
pub fn retry_budget(ttl: Duration, retry_ratio: f32, min_retries_per_second: u32) -> pb::RetryBudget {
pb::RetryBudget {
ttl: Some(ttl.into()),
retry_ratio,
min_retries_per_second,
}
}
pub fn route() -> RouteBuilder {
RouteBuilder::default()
}
impl RouteBuilder {
pub fn request_any(self) -> Self {
self.request_path(".*")
}
pub fn request_path(mut self, path: &str) -> Self {
let path_match = pb::PathMatch {
regex: String::from(path)
};
self.route.condition = Some(pb::RequestMatch {
match_: Some(pb::request_match::Match::Path(path_match)),
});
self
}
fn response_class(mut self, condition: pb::ResponseMatch, is_failure: bool) -> Self {
self.route.response_classes.push(pb::ResponseClass {
condition: Some(condition),
is_failure,
});
self
}
fn response_class_status(self, status_range: impl RangeBounds<u16>, is_failure: bool) -> Self {
let min = match status_range.start_bound() {
Bound::Included(&min) => min,
Bound::Excluded(&min) => min + 1,
Bound::Unbounded => 100,
}.into();
let max = match status_range.end_bound() {
Bound::Included(&max) => max,
Bound::Excluded(&max) => max - 1,
Bound::Unbounded => 599,
}.into();
assert!(min >= 100 && min <= max);
assert!(max <= 599);
let range = pb::HttpStatusRange {
min,
max,
};
let condition = pb::ResponseMatch {
match_: Some(pb::response_match::Match::Status(range)),
};
self.response_class(condition, is_failure)
}
pub fn response_success(self, status_range: impl RangeBounds<u16>) -> Self {
self.response_class_status(status_range, false)
}
pub fn response_failure(self, status_range: impl RangeBounds<u16>) -> Self {
self.response_class_status(status_range, true)
}
pub fn retryable(mut self, is: bool) -> Self {
self.route.is_retryable = is;
self
}
}
impl From<RouteBuilder> for pb::Route {
fn from(rb: RouteBuilder) -> Self {
rb.route
}
}
fn ip_conv(ip: IpAddr) -> net::IpAddress {
match ip {
IpAddr::V4(v4) => net::IpAddress {

View File

@ -55,16 +55,17 @@ const DEFAULT_LOG: &'static str =
linkerd2_proxy::proxy::http::router=off,\
linkerd2_proxy::proxy::tcp=off";
pub fn env_logger_init() {
pub fn env_logger_init() -> Result<(), String> {
use std::env;
let log = env::var("LINKERD2_PROXY_LOG").unwrap_or_else(|_| DEFAULT_LOG.to_owned());
let log = env::var("LINKERD2_PROXY_LOG")
.or_else(|_| env::var("RUST_LOG"))
.unwrap_or_else(|_| DEFAULT_LOG.to_owned());
env::set_var("RUST_LOG", &log);
env::set_var("LINKERD2_PROXY_LOG", &log);
if let Err(e) = env_logger::try_init() {
eprintln!("Failed to initialize logger: {}", e);
}
env_logger::try_init()
.map_err(|e| e.to_string())
}
/// Retry an assertion up to a specified number of times, waiting