Move `control::Bind` to `svc::NewClient` (#86)
The `control::destination` exposes an important trait, `Bind`, that abstracts the logic of instantiating a new service for an individual endpoint (i.e., in a load balancer). This interface is not specific to our service discovery implementation, and can easily be used to model other types of client factory. In the spirit of consolidating our HTTP-specific logic, and making the core APIs of the proxy more visible, this change renames the `Bind` trait to `NewClient`, simplifies the trait to have fewer type parameters, and documents this new generalized API.
This commit is contained in:
parent
8ea9a3644d
commit
d98c83404b
15
src/bind.rs
15
src/bind.rs
|
@ -9,9 +9,9 @@ use tower_service as tower;
|
|||
use tower_h2;
|
||||
use tower_reconnect::{Reconnect, Error as ReconnectError};
|
||||
|
||||
use control;
|
||||
use control::destination::Endpoint;
|
||||
use ctx;
|
||||
use svc::NewClient;
|
||||
use telemetry;
|
||||
use transparency::{self, HttpBody, h1, orig_proto};
|
||||
use transport;
|
||||
|
@ -360,19 +360,16 @@ impl<C, B> Bind<C, B> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<B> control::destination::Bind for BindProtocol<ctx::Proxy, B>
|
||||
impl<B> NewClient for BindProtocol<ctx::Proxy, B>
|
||||
where
|
||||
B: tower_h2::Body + Send + 'static,
|
||||
<B::Data as ::bytes::IntoBuf>::Buf: Send,
|
||||
{
|
||||
type Endpoint = Endpoint;
|
||||
type Request = http::Request<B>;
|
||||
type Response = HttpResponse;
|
||||
type Error = <Service<B> as tower::Service>::Error;
|
||||
type Service = Service<B>;
|
||||
type BindError = ();
|
||||
type Target = Endpoint;
|
||||
type Error = ();
|
||||
type Client = Service<B>;
|
||||
|
||||
fn bind(&self, ep: &Endpoint) -> Result<Self::Service, Self::BindError> {
|
||||
fn new_client(&mut self, ep: &Endpoint) -> Result<Self::Client, ()> {
|
||||
Ok(self.bind.bind_service(ep, &self.protocol))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -254,12 +254,12 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
) {
|
||||
let (update_str, update, addr) = match change {
|
||||
CacheChange::Insertion { key, value } => {
|
||||
("insert", Update::Bind(key, value.clone()), key)
|
||||
("insert", Update::NewClient(key, value.clone()), key)
|
||||
},
|
||||
CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
|
||||
CacheChange::Modification { key, new_value } => (
|
||||
"change metadata for",
|
||||
Update::Bind(key, new_value.clone()),
|
||||
Update::NewClient(key, new_value.clone()),
|
||||
key,
|
||||
),
|
||||
};
|
||||
|
|
|
@ -226,7 +226,7 @@ where
|
|||
// them onto the new watch first
|
||||
match occ.get().addrs {
|
||||
Exists::Yes(ref cache) => for (&addr, meta) in cache {
|
||||
let update = Update::Bind(addr, meta.clone());
|
||||
let update = Update::NewClient(addr, meta.clone());
|
||||
resolve.responder.update_tx
|
||||
.unbounded_send(update)
|
||||
.expect("unbounded_send does not fail");
|
||||
|
|
|
@ -36,11 +36,11 @@ use futures::{
|
|||
Poll,
|
||||
Stream
|
||||
};
|
||||
use http;
|
||||
use tower_discover::{Change, Discover};
|
||||
use tower_service::Service;
|
||||
|
||||
use dns;
|
||||
use svc::NewClient;
|
||||
use tls;
|
||||
use transport::{DnsNameAndPort, HostAndPort};
|
||||
|
||||
|
@ -76,7 +76,7 @@ struct Responder {
|
|||
|
||||
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
|
||||
#[derive(Debug)]
|
||||
pub struct Resolution<B> {
|
||||
pub struct Resolution<N> {
|
||||
/// Receives updates from the controller.
|
||||
update_rx: mpsc::UnboundedReceiver<Update>,
|
||||
|
||||
|
@ -86,8 +86,8 @@ pub struct Resolution<B> {
|
|||
/// reference has been dropped.
|
||||
_active: Arc<()>,
|
||||
|
||||
/// Binds an update endpoint to a Service.
|
||||
bind: B,
|
||||
/// Creates clients for each new endpoint in the resolution.
|
||||
new_endpoint: N,
|
||||
}
|
||||
|
||||
/// Metadata describing an endpoint.
|
||||
|
@ -120,34 +120,11 @@ enum Update {
|
|||
///
|
||||
/// If there was already an endpoint in the load balancer for this
|
||||
/// address, it should be replaced with the new one.
|
||||
Bind(SocketAddr, Metadata),
|
||||
NewClient(SocketAddr, Metadata),
|
||||
/// Indicates that the endpoint for this `SocketAddr` should be removed.
|
||||
Remove(SocketAddr),
|
||||
}
|
||||
|
||||
/// Bind a `SocketAddr` with a protocol.
|
||||
pub trait Bind {
|
||||
/// The type of endpoint upon which a `Service` is bound.
|
||||
type Endpoint;
|
||||
|
||||
/// Requests handled by the discovered services
|
||||
type Request;
|
||||
|
||||
/// Responses given by the discovered services
|
||||
type Response;
|
||||
|
||||
/// Errors produced by the discovered services
|
||||
type Error;
|
||||
|
||||
type BindError;
|
||||
|
||||
/// The discovered `Service` instance.
|
||||
type Service: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
|
||||
|
||||
/// Bind a service from an endpoint.
|
||||
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
|
||||
}
|
||||
|
||||
/// Returns a `Resolver` and a background task future.
|
||||
///
|
||||
/// The `Resolver` is used by a listener to request resolutions, while
|
||||
|
@ -179,7 +156,10 @@ pub fn new(
|
|||
|
||||
impl Resolver {
|
||||
/// Start watching for address changes for a certain authority.
|
||||
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Resolution<B> {
|
||||
pub fn resolve<N>(&self, authority: &DnsNameAndPort, new_endpoint: N) -> Resolution<N>
|
||||
where
|
||||
N: NewClient,
|
||||
{
|
||||
trace!("resolve; authority={:?}", authority);
|
||||
let (update_tx, update_rx) = mpsc::unbounded();
|
||||
let active = Arc::new(());
|
||||
|
@ -200,22 +180,22 @@ impl Resolver {
|
|||
Resolution {
|
||||
update_rx,
|
||||
_active: active,
|
||||
bind,
|
||||
new_endpoint,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==== impl Resolution =====
|
||||
|
||||
impl<B, A> Discover for Resolution<B>
|
||||
impl<N> Discover for Resolution<N>
|
||||
where
|
||||
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
|
||||
N: NewClient<Target = Endpoint>,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = B::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Service = B::Service;
|
||||
type Request = <N::Client as Service>::Request;
|
||||
type Response = <N::Client as Service>::Response;
|
||||
type Error = <N::Client as Service>::Error;
|
||||
type Service = N::Client;
|
||||
type DiscoverError = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
|
@ -225,14 +205,14 @@ where
|
|||
let update = try_ready!(up).expect("destination stream must be infinite");
|
||||
|
||||
match update {
|
||||
Update::Bind(addr, meta) => {
|
||||
Update::NewClient(addr, meta) => {
|
||||
// We expect the load balancer to handle duplicate inserts
|
||||
// by replacing the old endpoint with the new one, so
|
||||
// insertions of new endpoints and metadata changes for
|
||||
// existing ones can be handled in the same way.
|
||||
let endpoint = Endpoint::new(addr, meta);
|
||||
|
||||
let service = self.bind.bind(&endpoint).map_err(|_| ())?;
|
||||
let service = self.new_endpoint.new_client(&endpoint).map_err(|_| ())?;
|
||||
|
||||
return Ok(Async::Ready(Change::Insert(addr, service)));
|
||||
},
|
||||
|
|
|
@ -6,6 +6,5 @@ pub mod pb;
|
|||
mod remote_stream;
|
||||
mod serve_http;
|
||||
|
||||
pub use self::destination::Bind;
|
||||
pub use self::observe::Observe;
|
||||
pub use self::serve_http::serve_http;
|
||||
|
|
|
@ -88,6 +88,7 @@ mod logging;
|
|||
mod map_err;
|
||||
mod outbound;
|
||||
pub mod stream;
|
||||
mod svc;
|
||||
pub mod task;
|
||||
pub mod telemetry;
|
||||
mod transparency;
|
||||
|
|
|
@ -15,7 +15,8 @@ use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody};
|
|||
use linkerd2_proxy_router::Recognize;
|
||||
|
||||
use bind::{self, Bind, Protocol};
|
||||
use control::destination::{self, Bind as BindTrait, Resolution};
|
||||
use control::destination::{self, Resolution};
|
||||
use svc::NewClient;
|
||||
use ctx;
|
||||
use telemetry::http::service::{ResponseBody as SensorBody};
|
||||
use timeout::Timeout;
|
||||
|
@ -223,8 +224,8 @@ where
|
|||
// in the Balancer forever. However, when we finally add
|
||||
// circuit-breaking, this should be able to take care of itself,
|
||||
// closing down when the connection is no longer usable.
|
||||
if let Some((addr, bind)) = opt.take() {
|
||||
let svc = bind.bind(&addr.into())
|
||||
if let Some((addr, mut bind)) = opt.take() {
|
||||
let svc = bind.new_client(&addr.into())
|
||||
.map_err(|_| BindError::External { addr })?;
|
||||
Ok(Async::Ready(Change::Insert(addr, svc)))
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
//! Infrastructure for proxying request-response message streams
|
||||
//!
|
||||
//! This module contains utilities for proxying request-response streams. This
|
||||
//! module borrows (and re-exports) from `tower`.
|
||||
//!
|
||||
//! ## Clients
|
||||
//!
|
||||
//! A client is a `Service` through which the proxy may dispatch requests.
|
||||
//!
|
||||
//! In the proxy, there are currently two types of clients:
|
||||
//!
|
||||
//! - As the proxy routes requests to an outbound `Destination`, a client
|
||||
//! service is resolves the destination to and load balances requests
|
||||
//! over its endpoints.
|
||||
//!
|
||||
//! - As an outbound load balancer dispatches a request to an endpoint, or as
|
||||
//! the inbound proxy fowards an inbound request, a client service models an
|
||||
//! individual `SocketAddr`.
|
||||
//!
|
||||
//! ## TODO
|
||||
//!
|
||||
//! * Move HTTP-specific service infrastructure into `svc::http`.
|
||||
|
||||
pub use tower_service::Service;
|
||||
|
||||
pub trait NewClient {
|
||||
|
||||
/// Describes a resource to which the client will be attached.
|
||||
///
|
||||
/// Depending on the implementation, the target may describe a logical name
|
||||
/// to be resolved (i.e. via DNS) and load balanced, or it may describe a
|
||||
/// specific network address to which one or more connections will be
|
||||
/// established, or it may describe an entirely arbitrary "virtual" service
|
||||
/// (i.e. that exists locally in memory).
|
||||
type Target;
|
||||
|
||||
/// Indicates why the provided `Target` cannot be used to instantiate a client.
|
||||
type Error;
|
||||
|
||||
/// Serves requests on behalf of a target.
|
||||
///
|
||||
/// `Client`s are expected to acquire resources lazily as
|
||||
/// `Service::poll_ready` is called. `Service::poll_ready` must not return
|
||||
/// `Async::Ready` until the service is ready to service requests.
|
||||
/// `Service::call` must not be called until `Service::poll_ready` returns
|
||||
/// `Async::Ready`. When `Service::poll_ready` returns an error, the
|
||||
/// client must be discarded.
|
||||
type Client: Service;
|
||||
|
||||
/// Creates a client
|
||||
///
|
||||
/// If the provided `Target` is valid, immediately return a `Client` that may
|
||||
/// become ready lazily, i.e. as the target is resolved and connections are
|
||||
/// established.
|
||||
fn new_client(&mut self, t: &Self::Target) -> Result<Self::Client, Self::Error>;
|
||||
}
|
Loading…
Reference in New Issue