Delete ctx::Process (#68)

In order to start dismantling the monolithic ctx structures, this change
removes the root `ctx::Process` type. This simplifies `ctx::Proxy` such
that is copyable and need not be `Arc`ed.

`telemetry::metrics::labels::Direction` has been updated to decorate
`ctx::Proxy` instead of modeling the same variants directly as an enum.
This commit is contained in:
Oliver Gould 2018-08-15 16:27:57 -07:00 committed by GitHub
parent 48b7383ff0
commit 6dde18cf34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 111 additions and 193 deletions

View File

@ -2,7 +2,6 @@ use std::error::Error;
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use futures::{Async, Future, Poll, future, task}; use futures::{Async, Future, Poll, future, task};
use http::{self, uri}; use http::{self, uri};
@ -53,7 +52,7 @@ where
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send, <B::Data as ::bytes::IntoBuf>::Buf: Send,
{ {
bind: Bind<Arc<ctx::Proxy>, B>, bind: Bind<ctx::Proxy, B>,
binding: Binding<B>, binding: Binding<B>,
/// Prevents logging repeated connect errors. /// Prevents logging repeated connect errors.
/// ///
@ -132,7 +131,7 @@ pub struct NormalizeUri<S> {
} }
pub struct RebindTls<B> { pub struct RebindTls<B> {
bind: Bind<Arc<ctx::Proxy>, B>, bind: Bind<ctx::Proxy, B>,
protocol: Protocol, protocol: Protocol,
endpoint: Endpoint, endpoint: Endpoint,
} }
@ -219,7 +218,7 @@ impl<C: Clone, B> Clone for Bind<C, B> {
} }
} }
impl<B> Bind<Arc<ctx::Proxy>, B> impl<B> Bind<ctx::Proxy, B>
where where
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send, <B::Data as ::bytes::IntoBuf>::Buf: Send,
@ -255,7 +254,7 @@ where
}); });
let client_ctx = ctx::transport::Client::new( let client_ctx = ctx::transport::Client::new(
&self.ctx, self.ctx,
&addr, &addr,
ep.metadata().clone(), ep.metadata().clone(),
TlsStatus::from(&tls), TlsStatus::from(&tls),
@ -267,7 +266,7 @@ where
&client_ctx, &client_ctx,
); );
let log = ::logging::Client::proxy(&self.ctx, addr) let log = ::logging::Client::proxy(self.ctx, addr)
.with_protocol(protocol.clone()); .with_protocol(protocol.clone());
let client = transparency::Client::new( let client = transparency::Client::new(
protocol, protocol,
@ -361,7 +360,7 @@ impl<C, B> Bind<C, B> {
} }
} }
impl<B> control::destination::Bind for BindProtocol<Arc<ctx::Proxy>, B> impl<B> control::destination::Bind for BindProtocol<ctx::Proxy, B>
where where
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send, <B::Data as ::bytes::IntoBuf>::Buf: Send,

View File

@ -193,10 +193,9 @@ impl ctx::transport::Server {
} }
fn direction(&self) -> tap::tap_event::ProxyDirection { fn direction(&self) -> tap::tap_event::ProxyDirection {
if self.proxy.is_outbound() { match self.proxy {
tap::tap_event::ProxyDirection::Outbound ctx::Proxy::Outbound => tap::tap_event::ProxyDirection::Outbound,
} else { ctx::Proxy::Inbound => tap::tap_event::ProxyDirection::Inbound,
tap::tap_event::ProxyDirection::Inbound
} }
} }
} }

View File

@ -83,14 +83,11 @@ impl Request {
/// Returns a `TlsStatus` indicating if the request was sent was over TLS. /// Returns a `TlsStatus` indicating if the request was sent was over TLS.
pub fn tls_status(&self) -> ctx::transport::TlsStatus { pub fn tls_status(&self) -> ctx::transport::TlsStatus {
if self.server.proxy.is_outbound() { use ctx::Proxy::*;
// If the request is in the outbound direction, then we opened the // The proxy only handles TLS on one side of each proxy.
// client connection, so check if it was secured. match self.server.proxy {
self.client.tls_status Outbound => self.client.tls_status,
} else { Inbound => self.server.tls_status,
// Otherwise, the request is inbound, so check if we accepted it
// over TLS.
self.server.tls_status
} }
} }

View File

@ -1,80 +1,26 @@
//! Describes proxy traffic.
//!
//! Contexts are primarily intended to describe traffic contexts for the purposes of
//! telemetry. They may also be useful for, for instance,
//! routing/rate-limiting/policy/etc.
//!
//! As a rule, context types should implement `Clone + Send + Sync`. This allows them to
//! be stored in `http::Extensions`, for instance. Furthermore, because these contexts
//! will be sent to a telemetry processing thread, we want to avoid excessive cloning.
use config;
use std::time::SystemTime;
use std::sync::Arc;
pub mod http; pub mod http;
pub mod transport; pub mod transport;
/// Describes a single running proxy instance.
#[derive(Clone, Debug)]
pub struct Process {
/// Identifies the Kubernetes namespace in which this proxy is process.
pub scheduled_namespace: String,
pub start_time: SystemTime,
}
/// Indicates the orientation of traffic, relative to a sidecar proxy. /// Indicates the orientation of traffic, relative to a sidecar proxy.
/// ///
/// Each process exposes two proxies: /// Each process exposes two proxies:
/// - The _inbound_ proxy receives traffic from another services forwards it to within the /// - The _inbound_ proxy receives traffic from another services forwards it to within the
/// local instance. /// local instance.
/// - The _outbound_ proxy receives traffic from the local instance and forwards it to a /// - The _outbound_ proxy receives traffic from the local instance and forwards it to a
/// remove service. /// remote service.
#[derive(Clone, Debug)] #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum Proxy { pub enum Proxy {
Inbound(Arc<Process>), Inbound,
Outbound(Arc<Process>), Outbound,
}
impl Process {
// Test-only, but we can't use `#[cfg(test)]` because it is used by the
// benchmarks
pub fn test(ns: &str) -> Arc<Self> {
Arc::new(Self {
scheduled_namespace: ns.into(),
start_time: SystemTime::now(),
})
}
/// Construct a new `Process` from environment variables.
pub fn new(config: &config::Config) -> Arc<Self> {
let start_time = SystemTime::now();
Arc::new(Self {
scheduled_namespace: config.namespaces.pod.clone(),
start_time,
})
}
} }
impl Proxy { impl Proxy {
pub fn inbound(p: &Arc<Process>) -> Arc<Self> { pub fn as_str(&self) -> &'static str {
Arc::new(Proxy::Inbound(Arc::clone(p)))
}
pub fn outbound(p: &Arc<Process>) -> Arc<Self> {
Arc::new(Proxy::Outbound(Arc::clone(p)))
}
pub fn is_inbound(&self) -> bool {
match *self { match *self {
Proxy::Inbound(_) => true, Proxy::Inbound => "in",
Proxy::Outbound(_) => false, Proxy::Outbound => "out",
} }
} }
pub fn is_outbound(&self) -> bool {
!self.is_inbound()
}
} }
#[cfg(test)] #[cfg(test)]
@ -96,19 +42,15 @@ pub mod test_util {
([1, 2, 3, 4], 5678).into() ([1, 2, 3, 4], 5678).into()
} }
pub fn process() -> Arc<ctx::Process> {
ctx::Process::test("test")
}
pub fn server( pub fn server(
proxy: &Arc<ctx::Proxy>, proxy: ctx::Proxy,
tls: ctx::transport::TlsStatus tls: ctx::transport::TlsStatus
) -> Arc<ctx::transport::Server> { ) -> Arc<ctx::transport::Server> {
ctx::transport::Server::new(&proxy, &addr(), &addr(), &Some(addr()), tls) ctx::transport::Server::new(proxy, &addr(), &addr(), &Some(addr()), tls)
} }
pub fn client<L, S>( pub fn client<L, S>(
proxy: &Arc<ctx::Proxy>, proxy: ctx::Proxy,
labels: L, labels: L,
tls: ctx::transport::TlsStatus, tls: ctx::transport::TlsStatus,
) -> Arc<ctx::transport::Client> ) -> Arc<ctx::transport::Client>
@ -119,7 +61,7 @@ pub mod test_util {
let meta = destination::Metadata::new(DstLabels::new(labels), let meta = destination::Metadata::new(DstLabels::new(labels),
destination::ProtocolHint::Unknown, destination::ProtocolHint::Unknown,
Conditional::None(tls::ReasonForNoIdentity::NotProvidedByServiceDiscovery)); Conditional::None(tls::ReasonForNoIdentity::NotProvidedByServiceDiscovery));
ctx::transport::Client::new(&proxy, &addr(), meta, tls) ctx::transport::Client::new(proxy, &addr(), meta, tls)
} }
pub fn request( pub fn request(

View File

@ -19,7 +19,7 @@ pub enum Ctx {
/// Identifies a connection from another process to a proxy listener. /// Identifies a connection from another process to a proxy listener.
#[derive(Debug)] #[derive(Debug)]
pub struct Server { pub struct Server {
pub proxy: Arc<ctx::Proxy>, pub proxy: ctx::Proxy,
pub remote: SocketAddr, pub remote: SocketAddr,
pub local: SocketAddr, pub local: SocketAddr,
pub orig_dst: Option<SocketAddr>, pub orig_dst: Option<SocketAddr>,
@ -29,7 +29,7 @@ pub struct Server {
/// Identifies a connection from the proxy to another process. /// Identifies a connection from the proxy to another process.
#[derive(Debug)] #[derive(Debug)]
pub struct Client { pub struct Client {
pub proxy: Arc<ctx::Proxy>, pub proxy: ctx::Proxy,
pub remote: SocketAddr, pub remote: SocketAddr,
pub metadata: destination::Metadata, pub metadata: destination::Metadata,
pub tls_status: TlsStatus, pub tls_status: TlsStatus,
@ -63,16 +63,16 @@ impl fmt::Display for TlsStatus {
impl Ctx { impl Ctx {
pub fn proxy(&self) -> &Arc<ctx::Proxy> { pub fn proxy(&self) -> ctx::Proxy {
match *self { match *self {
Ctx::Client(ref ctx) => &ctx.proxy, Ctx::Client(ref ctx) => ctx.proxy,
Ctx::Server(ref ctx) => &ctx.proxy, Ctx::Server(ref ctx) => ctx.proxy,
} }
} }
pub fn tls_status(&self) -> TlsStatus { pub fn tls_status(&self) -> TlsStatus {
match self { match self {
Ctx::Client(ctx) => ctx.tls_status, Ctx::Client(ctx) => ctx.tls_status,
Ctx::Server(ctx) => ctx.tls_status, Ctx::Server(ctx) => ctx.tls_status,
} }
} }
@ -80,14 +80,14 @@ impl Ctx {
impl Server { impl Server {
pub fn new( pub fn new(
proxy: &Arc<ctx::Proxy>, proxy: ctx::Proxy,
local: &SocketAddr, local: &SocketAddr,
remote: &SocketAddr, remote: &SocketAddr,
orig_dst: &Option<SocketAddr>, orig_dst: &Option<SocketAddr>,
tls_status: TlsStatus, tls_status: TlsStatus,
) -> Arc<Server> { ) -> Arc<Server> {
let s = Server { let s = Server {
proxy: Arc::clone(proxy), proxy,
local: *local, local: *local,
remote: *remote, remote: *remote,
orig_dst: *orig_dst, orig_dst: *orig_dst,
@ -123,13 +123,13 @@ fn same_addr(a0: &SocketAddr, a1: &SocketAddr) -> bool {
impl Client { impl Client {
pub fn new( pub fn new(
proxy: &Arc<ctx::Proxy>, proxy: ctx::Proxy,
remote: &SocketAddr, remote: &SocketAddr,
metadata: destination::Metadata, metadata: destination::Metadata,
tls_status: TlsStatus, tls_status: TlsStatus,
) -> Arc<Client> { ) -> Arc<Client> {
let c = Client { let c = Client {
proxy: Arc::clone(proxy), proxy,
remote: *remote, remote: *remote,
metadata, metadata,
tls_status, tls_status,

View File

@ -12,7 +12,7 @@ use bind;
use ctx; use ctx;
use transparency::orig_proto; use transparency::orig_proto;
type Bind<B> = bind::Bind<Arc<ctx::Proxy>, B>; type Bind<B> = bind::Bind<ctx::Proxy, B>;
pub struct Inbound<B> { pub struct Inbound<B> {
default_addr: Option<SocketAddr>, default_addr: Option<SocketAddr>,
@ -103,7 +103,6 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net; use std::net;
use std::sync::Arc;
use http; use http;
use linkerd2_proxy_router::Recognize; use linkerd2_proxy_router::Recognize;
@ -114,8 +113,8 @@ mod tests {
use conditional::Conditional; use conditional::Conditional;
use tls; use tls;
fn new_inbound(default: Option<net::SocketAddr>, ctx: &Arc<ctx::Proxy>) -> Inbound<()> { fn new_inbound(default: Option<net::SocketAddr>, ctx: ctx::Proxy) -> Inbound<()> {
let bind = Bind::new(tls::ClientConfig::no_tls()).with_ctx(ctx.clone()); let bind = Bind::new(tls::ClientConfig::no_tls()).with_ctx(ctx);
Inbound::new(default, bind) Inbound::new(default, bind)
} }
@ -137,12 +136,12 @@ mod tests {
local: net::SocketAddr, local: net::SocketAddr,
remote: net::SocketAddr remote: net::SocketAddr
) -> bool { ) -> bool {
let ctx = ctx::Proxy::inbound(&ctx::Process::test("test")); let ctx = ctx::Proxy::Inbound;
let inbound = new_inbound(None, &ctx); let inbound = new_inbound(None, ctx);
let srv_ctx = ctx::transport::Server::new( let srv_ctx = ctx::transport::Server::new(
&ctx, &local, &remote, &Some(orig_dst), TLS_DISABLED); ctx, &local, &remote, &Some(orig_dst), TLS_DISABLED);
let rec = srv_ctx.orig_dst_if_not_local().map(make_key_http1); let rec = srv_ctx.orig_dst_if_not_local().map(make_key_http1);
@ -158,14 +157,14 @@ mod tests {
local: net::SocketAddr, local: net::SocketAddr,
remote: net::SocketAddr remote: net::SocketAddr
) -> bool { ) -> bool {
let ctx = ctx::Proxy::inbound(&ctx::Process::test("test")); let ctx = ctx::Proxy::Inbound;
let inbound = new_inbound(default, &ctx); let inbound = new_inbound(default, ctx);
let mut req = http::Request::new(()); let mut req = http::Request::new(());
req.extensions_mut() req.extensions_mut()
.insert(ctx::transport::Server::new( .insert(ctx::transport::Server::new(
&ctx, ctx,
&local, &local,
&remote, &remote,
&None, &None,
@ -176,9 +175,9 @@ mod tests {
} }
fn recognize_default_no_ctx(default: Option<net::SocketAddr>) -> bool { fn recognize_default_no_ctx(default: Option<net::SocketAddr>) -> bool {
let ctx = ctx::Proxy::inbound(&ctx::Process::test("test")); let ctx = ctx::Proxy::Inbound;
let inbound = new_inbound(default, &ctx); let inbound = new_inbound(default, ctx);
let req = http::Request::new(()); let req = http::Request::new(());
@ -190,14 +189,14 @@ mod tests {
local: net::SocketAddr, local: net::SocketAddr,
remote: net::SocketAddr remote: net::SocketAddr
) -> bool { ) -> bool {
let ctx = ctx::Proxy::inbound(&ctx::Process::test("test")); let ctx = ctx::Proxy::Inbound;
let inbound = new_inbound(default, &ctx); let inbound = new_inbound(default, ctx);
let mut req = http::Request::new(()); let mut req = http::Request::new(());
req.extensions_mut() req.extensions_mut()
.insert(ctx::transport::Server::new( .insert(ctx::transport::Server::new(
&ctx, ctx,
&local, &local,
&remote, &remote,
&Some(local), &Some(local),

View File

@ -60,7 +60,7 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::{Duration, SystemTime};
use indexmap::IndexSet; use indexmap::IndexSet;
use tokio::{ use tokio::{
@ -122,6 +122,8 @@ pub struct Main<G> {
config: config::Config, config: config::Config,
tls_config_watch: tls::ConfigWatch, tls_config_watch: tls::ConfigWatch,
start_time: SystemTime,
control_listener: BoundPort, control_listener: BoundPort,
inbound_listener: BoundPort, inbound_listener: BoundPort,
outbound_listener: BoundPort, outbound_listener: BoundPort,
@ -144,6 +146,8 @@ where
where where
R: Into<MainRuntime>, R: Into<MainRuntime>,
{ {
let start_time = SystemTime::now();
let tls_config_watch = tls::ConfigWatch::new(config.tls_settings.clone()); let tls_config_watch = tls::ConfigWatch::new(config.tls_settings.clone());
// TODO: Serve over TLS. // TODO: Serve over TLS.
@ -180,6 +184,7 @@ where
Main { Main {
config, config,
start_time,
tls_config_watch, tls_config_watch,
control_listener, control_listener,
inbound_listener, inbound_listener,
@ -211,10 +216,9 @@ where
where where
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Item = (), Error = ()> + Send + 'static,
{ {
let process_ctx = ctx::Process::new(&self.config);
let Main { let Main {
config, config,
start_time,
tls_config_watch, tls_config_watch,
control_listener, control_listener,
inbound_listener, inbound_listener,
@ -248,7 +252,7 @@ where
let (taps, observe) = control::Observe::new(100); let (taps, observe) = control::Observe::new(100);
let (sensors, tls_config_sensor, metrics_server) = telemetry::new( let (sensors, tls_config_sensor, metrics_server) = telemetry::new(
&process_ctx, start_time,
config.metrics_retain_idle, config.metrics_retain_idle,
&taps, &taps,
); );
@ -288,8 +292,8 @@ where
// address and listen for inbound connections that should be forwarded // address and listen for inbound connections that should be forwarded
// to the managed application (private destination). // to the managed application (private destination).
let inbound = { let inbound = {
let ctx = ctx::Proxy::inbound(&process_ctx); let ctx = ctx::Proxy::Inbound;
let bind = bind.clone().with_ctx(ctx.clone()); let bind = bind.clone().with_ctx(ctx);
let default_addr = config.private_forward.map(|a| a.into()); let default_addr = config.private_forward.map(|a| a.into());
let router = Router::new( let router = Router::new(
@ -313,8 +317,8 @@ where
// address and listen for outbound requests that should be routed // address and listen for outbound requests that should be routed
// to a remote service (public destination). // to a remote service (public destination).
let outbound = { let outbound = {
let ctx = ctx::Proxy::outbound(&process_ctx); let ctx = ctx::Proxy::Outbound;
let bind = bind.clone().with_ctx(ctx.clone()); let bind = bind.clone().with_ctx(ctx);
let router = Router::new( let router = Router::new(
Outbound::new(bind, resolver, config.bind_timeout), Outbound::new(bind, resolver, config.bind_timeout),
config.outbound_router_capacity, config.outbound_router_capacity,
@ -387,7 +391,7 @@ fn serve<R, B, E, F, G>(
router: Router<R>, router: Router<R>,
tcp_connect_timeout: Duration, tcp_connect_timeout: Duration,
disable_protocol_detection_ports: IndexSet<u16>, disable_protocol_detection_ports: IndexSet<u16>,
proxy_ctx: Arc<ctx::Proxy>, proxy_ctx: ctx::Proxy,
sensors: telemetry::Sensors, sensors: telemetry::Sensors,
get_orig_dst: G, get_orig_dst: G,
drain_rx: drain::Watch, drain_rx: drain::Watch,
@ -448,7 +452,7 @@ where
let listen_addr = bound_port.local_addr(); let listen_addr = bound_port.local_addr();
let server = Server::new( let server = Server::new(
listen_addr, listen_addr,
proxy_ctx.clone(), proxy_ctx,
sensors, sensors,
get_orig_dst, get_orig_dst,
stack, stack,

View File

@ -276,13 +276,8 @@ pub type ServerExecutor = ContextualExecutor<Server>;
pub type ServerFuture<F> = ContextualFuture<Server, F>; pub type ServerFuture<F> = ContextualFuture<Server, F>;
impl Server { impl Server {
pub fn proxy(ctx: &::ctx::Proxy, listen: SocketAddr) -> Self { pub fn proxy(ctx: ::ctx::Proxy, listen: SocketAddr) -> Self {
let name = if ctx.is_inbound() { Section::Proxy.server(ctx.as_str(), listen)
"in"
} else {
"out"
};
Section::Proxy.server(name, listen)
} }
pub fn with_remote(self, remote: SocketAddr) -> Self { pub fn with_remote(self, remote: SocketAddr) -> Self {
@ -312,13 +307,8 @@ impl fmt::Display for Server {
} }
impl<D: fmt::Display> Client<&'static str, D> { impl<D: fmt::Display> Client<&'static str, D> {
pub fn proxy(ctx: &::ctx::Proxy, dst: D) -> Self { pub fn proxy(ctx: ::ctx::Proxy, dst: D) -> Self {
let name = if ctx.is_inbound() { Section::Proxy.client(ctx.as_str(), dst)
"in"
} else {
"out"
};
Section::Proxy.client(name, dst)
} }
} }

View File

@ -22,10 +22,10 @@ use timeout::Timeout;
use transparency::{h1, HttpBody}; use transparency::{h1, HttpBody};
use transport::{DnsNameAndPort, Host, HostAndPort}; use transport::{DnsNameAndPort, Host, HostAndPort};
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>; type BindProtocol<B> = bind::BindProtocol<ctx::Proxy, B>;
pub struct Outbound<B> { pub struct Outbound<B> {
bind: Bind<Arc<ctx::Proxy>, B>, bind: Bind<ctx::Proxy, B>,
discovery: destination::Resolver, discovery: destination::Resolver,
bind_timeout: Duration, bind_timeout: Duration,
} }
@ -48,7 +48,7 @@ pub enum Destination {
// ===== impl Outbound ===== // ===== impl Outbound =====
impl<B> Outbound<B> { impl<B> Outbound<B> {
pub fn new(bind: Bind<Arc<ctx::Proxy>, B>, pub fn new(bind: Bind<ctx::Proxy, B>,
discovery: destination::Resolver, discovery: destination::Resolver,
bind_timeout: Duration) bind_timeout: Duration)
-> Outbound<B> { -> Outbound<B> {

View File

@ -52,10 +52,7 @@ pub enum Classification {
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum Direction { pub struct Direction(ctx::Proxy);
Inbound,
Outbound,
}
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub struct DstLabels { pub struct DstLabels {
@ -70,7 +67,7 @@ pub struct TlsStatus(ctx::transport::TlsStatus);
impl RequestLabels { impl RequestLabels {
pub fn new(req: &ctx::http::Request) -> Self { pub fn new(req: &ctx::http::Request) -> Self {
let direction = Direction::from_context(req.server.proxy.as_ref()); let direction = Direction::new(req.server.proxy);
let outbound_labels = req.dst_labels().cloned(); let outbound_labels = req.dst_labels().cloned();
@ -205,19 +202,16 @@ impl fmt::Display for Classification {
// ===== impl Direction ===== // ===== impl Direction =====
impl Direction { impl Direction {
pub fn from_context(context: &ctx::Proxy) -> Self { pub fn new(ctx: ctx::Proxy) -> Self {
match context { Direction(ctx)
&ctx::Proxy::Inbound(_) => Direction::Inbound,
&ctx::Proxy::Outbound(_) => Direction::Outbound,
}
} }
} }
impl fmt::Display for Direction { impl fmt::Display for Direction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self.0 {
&Direction::Inbound => f.pad("direction=\"inbound\""), ctx::Proxy::Inbound => f.pad("direction=\"inbound\""),
&Direction::Outbound => f.pad("direction=\"outbound\""), ctx::Proxy::Outbound => f.pad("direction=\"outbound\""),
} }
} }
} }

View File

@ -30,9 +30,8 @@ use std::default::Default;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant, SystemTime};
use ctx;
mod counter; mod counter;
mod gauge; mod gauge;
mod histogram; mod histogram;
@ -109,10 +108,10 @@ struct Stamped<T> {
/// is a Hyper service which can be used to create the server for the /// is a Hyper service which can be used to create the server for the
/// scrape endpoint, while the `Record` side can receive updates to the /// scrape endpoint, while the `Record` side can receive updates to the
/// metrics by calling `record_event`. /// metrics by calling `record_event`.
pub fn new(process: &Arc<ctx::Process>, idle_retain: Duration, tls: tls_config_reload::Report) pub fn new(start_time: SystemTime, idle_retain: Duration, tls: tls_config_reload::Report)
-> (Record, Serve) -> (Record, Serve)
{ {
let metrics = Arc::new(Mutex::new(Root::new(process, tls))); let metrics = Arc::new(Mutex::new(Root::new(start_time, tls)));
(Record::new(&metrics), Serve::new(&metrics, idle_retain)) (Record::new(&metrics), Serve::new(&metrics, idle_retain))
} }
@ -154,9 +153,9 @@ impl<'a, M: FmtMetric> Metric<'a, M> {
// ===== impl Root ===== // ===== impl Root =====
impl Root { impl Root {
pub fn new(process: &Arc<ctx::Process>, tls_config_reload: tls_config_reload::Report) -> Self { pub fn new(start_time: SystemTime, tls_config_reload: tls_config_reload::Report) -> Self {
Self { Self {
process: process::Report::new(&process), process: process::Report::new(start_time),
tls_config_reload, tls_config_reload,
.. Root::default() .. Root::default()
} }
@ -225,6 +224,7 @@ impl<T> ::std::ops::Deref for Stamped<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ctx;
use ctx::test_util::*; use ctx::test_util::*;
use super::*; use super::*;
use conditional::Conditional; use conditional::Conditional;
@ -235,11 +235,11 @@ mod tests {
fn mock_route( fn mock_route(
root: &mut Root, root: &mut Root,
proxy: &Arc<ctx::Proxy>, proxy: ctx::Proxy,
server: &Arc<ctx::transport::Server>, server: &Arc<ctx::transport::Server>,
team: &str team: &str
) { ) {
let client = client(&proxy, vec![("team", team)], TLS_DISABLED); let client = client(proxy, vec![("team", team)], TLS_DISABLED);
let (req, rsp) = request("http://nba.com", &server, &client); let (req, rsp) = request("http://nba.com", &server, &client);
let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
@ -254,10 +254,9 @@ mod tests {
#[test] #[test]
fn expiry() { fn expiry() {
let process = process(); let proxy = ctx::Proxy::Outbound;
let proxy = ctx::Proxy::outbound(&process);
let server = server(&proxy, TLS_DISABLED); let server = server(proxy, TLS_DISABLED);
let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone())); let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone()));
let mut root = Root::default(); let mut root = Root::default();
@ -265,10 +264,10 @@ mod tests {
let t0 = Instant::now(); let t0 = Instant::now();
root.transports().open(&server_transport); root.transports().open(&server_transport);
mock_route(&mut root, &proxy, &server, "warriors"); mock_route(&mut root, proxy, &server, "warriors");
let t1 = Instant::now(); let t1 = Instant::now();
mock_route(&mut root, &proxy, &server, "sixers"); mock_route(&mut root, proxy, &server, "sixers");
let t2 = Instant::now(); let t2 = Instant::now();
assert_eq!(root.requests.len(), 2); assert_eq!(root.requests.len(), 2);

View File

@ -97,7 +97,7 @@ mod test {
Event, Event,
}; };
use ctx::{self, test_util::*, transport::TlsStatus}; use ctx::{self, test_util::*, transport::TlsStatus};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant, SystemTime};
use conditional::Conditional; use conditional::Conditional;
use tls; use tls;
@ -106,11 +106,10 @@ mod test {
Conditional::None(tls::ReasonForNoTls::Disabled); Conditional::None(tls::ReasonForNoTls::Disabled);
fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) {
let process = process(); let proxy = ctx::Proxy::Outbound;
let proxy = ctx::Proxy::outbound(&process); let server = server(proxy, server_tls);
let server = server(&proxy, server_tls);
let client = client(&proxy, vec![ let client = client(proxy, vec![
("service", "draymond"), ("service", "draymond"),
("deployment", "durant"), ("deployment", "durant"),
("pod", "klay"), ("pod", "klay"),
@ -132,7 +131,7 @@ mod test {
frames_sent: 0, frames_sent: 0,
}; };
let (mut r, _) = metrics::new(&process, Duration::from_secs(100), Default::default()); let (mut r, _) = metrics::new(SystemTime::now(), Duration::from_secs(100), Default::default());
let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone());
let labels = labels::ResponseLabels::new(&rsp, None); let labels = labels::ResponseLabels::new(&rsp, None);
@ -168,11 +167,10 @@ mod test {
use self::labels::*; use self::labels::*;
use std::sync::Arc; use std::sync::Arc;
let process = process(); let proxy = ctx::Proxy::Outbound;
let proxy = ctx::Proxy::outbound(&process); let server = server(proxy, server_tls);
let server = server(&proxy, server_tls);
let client = client(&proxy, vec![ let client = client(proxy, vec![
("service", "draymond"), ("service", "draymond"),
("deployment", "durant"), ("deployment", "durant"),
("pod", "klay"), ("pod", "klay"),
@ -228,7 +226,7 @@ mod test {
), ),
]; ];
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default()); let (mut r, _) = metrics::new(SystemTime::now(), Duration::from_secs(1000), Default::default());
let req_labels = RequestLabels::new(&req); let req_labels = RequestLabels::new(&req);
let rsp_labels = ResponseLabels::new(&rsp, None); let rsp_labels = ResponseLabels::new(&rsp, None);

View File

@ -189,7 +189,7 @@ impl fmt::Display for Key {
impl Key { impl Key {
fn new(ctx: &ctx::transport::Ctx) -> Self { fn new(ctx: &ctx::transport::Ctx) -> Self {
Self { Self {
direction: Direction::from_context(ctx.proxy().as_ref()), direction: Direction::new(ctx.proxy()),
peer: match *ctx { peer: match *ctx {
ctx::transport::Ctx::Server(_) => Peer::Src, ctx::transport::Ctx::Server(_) => Peer::Src,
ctx::transport::Ctx::Client(_) => Peer::Dst, ctx::transport::Ctx::Client(_) => Peer::Dst,

View File

@ -1,7 +1,5 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::{Duration, SystemTime};
use ctx;
macro_rules! metrics { macro_rules! metrics {
{ $( $name:ident : $kind:ty { $help:expr } ),+ } => { { $( $name:ident : $kind:ty { $help:expr } ),+ } => {
@ -31,12 +29,12 @@ pub use self::metrics::{DstLabels, Serve as ServeMetrics};
pub use self::sensor::Sensors; pub use self::sensor::Sensors;
pub fn new( pub fn new(
process: &Arc<ctx::Process>, start_time: SystemTime,
metrics_retain_idle: Duration, metrics_retain_idle: Duration,
taps: &Arc<Mutex<tap::Taps>>, taps: &Arc<Mutex<tap::Taps>>,
) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) { ) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) {
let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new();
let (metrics_record, metrics_serve) = metrics::new(process, metrics_retain_idle, tls_config_fmt); let (metrics_record, metrics_serve) = metrics::new(start_time, metrics_retain_idle, tls_config_fmt);
let s = Sensors::new(metrics_record, taps); let s = Sensors::new(metrics_record, taps);
(s, tls_config_sensor, metrics_serve) (s, tls_config_sensor, metrics_serve)
} }

View File

@ -1,7 +1,6 @@
use std::fmt; use std::fmt;
use std::time::UNIX_EPOCH; use std::time::{SystemTime, UNIX_EPOCH};
use ctx;
use super::metrics::Gauge; use super::metrics::Gauge;
use self::system::System; use self::system::System;
@ -19,8 +18,8 @@ pub struct Report {
} }
impl Report { impl Report {
pub fn new(process: &ctx::Process) -> Self { pub fn new(start_time: SystemTime) -> Self {
let t0 = process.start_time let t0 = start_time
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("process start time") .expect("process start time")
.as_secs(); .as_secs();

View File

@ -240,7 +240,7 @@ where
let io = try_ready!(self.underlying.poll()); let io = try_ready!(self.underlying.poll());
debug!("client connection open"); debug!("client connection open");
let ctx = ctx::transport::Client::new( let ctx = ctx::transport::Client::new(
&self.ctx.proxy, self.ctx.proxy,
&self.ctx.remote, &self.ctx.remote,
self.ctx.metadata.clone(), self.ctx.metadata.clone(),
io.tls_status(), io.tls_status(),

View File

@ -46,7 +46,7 @@ where
>, >,
listen_addr: SocketAddr, listen_addr: SocketAddr,
new_service: S, new_service: S,
proxy_ctx: Arc<ProxyCtx>, proxy_ctx: ProxyCtx,
sensors: Sensors, sensors: Sensors,
tcp: tcp::Proxy, tcp: tcp::Proxy,
log: ::logging::Server, log: ::logging::Server,
@ -75,7 +75,7 @@ where
/// Creates a new `Server`. /// Creates a new `Server`.
pub fn new( pub fn new(
listen_addr: SocketAddr, listen_addr: SocketAddr,
proxy_ctx: Arc<ProxyCtx>, proxy_ctx: ProxyCtx,
sensors: Sensors, sensors: Sensors,
get_orig_dst: G, get_orig_dst: G,
stack: S, stack: S,
@ -85,7 +85,7 @@ where
) -> Self { ) -> Self {
let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone()); let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone());
let log = ::logging::Server::proxy(&proxy_ctx, listen_addr); let log = ::logging::Server::proxy(proxy_ctx, listen_addr);
Server { Server {
disable_protocol_detection_ports, disable_protocol_detection_ports,
drain_signal, drain_signal,
@ -124,7 +124,7 @@ where
let orig_dst = connection.original_dst_addr(&self.get_orig_dst); let orig_dst = connection.original_dst_addr(&self.get_orig_dst);
let local_addr = connection.local_addr().unwrap_or(self.listen_addr); let local_addr = connection.local_addr().unwrap_or(self.listen_addr);
let srv_ctx = ServerCtx::new( let srv_ctx = ServerCtx::new(
&self.proxy_ctx, self.proxy_ctx,
&local_addr, &local_addr,
&remote_addr, &remote_addr,
&orig_dst, &orig_dst,

View File

@ -63,7 +63,7 @@ impl Proxy {
let tls = Conditional::None(tls::ReasonForNoIdentity::NotHttp.into()); // TODO let tls = Conditional::None(tls::ReasonForNoIdentity::NotHttp.into()); // TODO
let client_ctx = ClientCtx::new( let client_ctx = ClientCtx::new(
&srv_ctx.proxy, srv_ctx.proxy,
&orig_dst, &orig_dst,
destination::Metadata::no_metadata(), destination::Metadata::no_metadata(),
TlsStatus::from(&tls), TlsStatus::from(&tls),