proxy: Remove configure-and-bind-to-executor pattern (#967)
A common pattern when using the old Tokio API was separating the configuration of a task from binding it to an executor to run on. This was often necessary when we wanted to construct a type corresponding to some task before the reactor on which it would execute was initialized. Typically, this was accomplished with two separate types, one of which represented the configuration and exposed only a method to take a reactor `Handle` and transform it to the other type, representing the actual task. After we migrate to the new Tokio API in #944, executors no longer need to be passed explictly, as we can use `DefaultExecutor::current` or `current_thread::TaskExecutor::current` to spawn a task on the current executor. Therefore, a lot of this complexity can be refactored away. This PR refactors the `Config` and `Process` structs in i`control::destination::background` into a single `Background` struct, and removes the `dns::Config` and `telemetry::MakeControl` structs (`dns::Resolver` and `telemetry::Control` are now constructed directly). It should not cause any functional changes. Closes #966 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
7bf4c1bc41
commit
d709ec37e3
|
@ -1,5 +1,3 @@
|
|||
use futures::sync::mpsc;
|
||||
use futures::{Async, Future, Stream};
|
||||
use std::collections::{
|
||||
hash_map::{Entry, HashMap},
|
||||
VecDeque,
|
||||
|
@ -8,8 +6,18 @@ use std::fmt;
|
|||
use std::iter::IntoIterator;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
future,
|
||||
sync::mpsc,
|
||||
Async, Future, Stream,
|
||||
};
|
||||
use h2;
|
||||
use http;
|
||||
use tower_grpc as grpc;
|
||||
use tower_h2::{BoxBody, HttpService, RecvBody};
|
||||
use tower_h2::{self, BoxBody, HttpService, RecvBody};
|
||||
use tower_reconnect::Reconnect;
|
||||
|
||||
use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
|
||||
use conduit_proxy_controller_grpc::destination::client::Destination as DestinationSvc;
|
||||
|
@ -17,31 +25,28 @@ use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
|
|||
use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr};
|
||||
|
||||
use super::{Metadata, ResolveRequest, Responder, Update};
|
||||
use control::cache::{Cache, CacheChange, Exists};
|
||||
use control::fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use control::remote_stream::{Receiver, Remote};
|
||||
use control::{
|
||||
cache::{Cache, CacheChange, Exists},
|
||||
fully_qualified_authority::FullyQualifiedAuthority,
|
||||
remote_stream::{Receiver, Remote},
|
||||
AddOrigin, Backoff, LogErrors
|
||||
};
|
||||
use dns::{self, IpAddrListFuture};
|
||||
use task::LazyExecutor;
|
||||
use telemetry::metrics::DstLabels;
|
||||
use transport::DnsNameAndPort;
|
||||
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
|
||||
use timeout::Timeout;
|
||||
|
||||
type DestinationServiceQuery<T> = Remote<PbUpdate, T>;
|
||||
type UpdateRx<T> = Receiver<PbUpdate, T>;
|
||||
|
||||
/// Stores the configuration for a destination background worker.
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
dns_config: dns::Config,
|
||||
default_destination_namespace: String,
|
||||
}
|
||||
|
||||
/// Satisfies resolutions as requested via `request_rx`.
|
||||
///
|
||||
/// As `Process` is polled with a client to Destination service, if the client to the
|
||||
/// As the `Background` is polled with a client to Destination service, if the client to the
|
||||
/// service is healthy, it reads requests from `request_rx`, determines how to resolve the
|
||||
/// provided authority to a set of addresses, and ensures that resolution updates are
|
||||
/// propagated to all requesters.
|
||||
pub struct Process<T: HttpService<ResponseBody = RecvBody>> {
|
||||
struct Background<T: HttpService<ResponseBody = RecvBody>> {
|
||||
dns_resolver: dns::Resolver,
|
||||
default_destination_namespace: String,
|
||||
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>,
|
||||
|
@ -62,46 +67,74 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
|||
responders: Vec<Responder>,
|
||||
}
|
||||
|
||||
// ==== impl Config =====
|
||||
|
||||
impl Config {
|
||||
pub(super) fn new(
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
dns_config: dns::Config,
|
||||
default_destination_namespace: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
request_rx,
|
||||
dns_config,
|
||||
default_destination_namespace,
|
||||
}
|
||||
}
|
||||
/// Returns a new discovery background task.
|
||||
pub(super) fn task(
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
dns_resolver: dns::Resolver,
|
||||
default_destination_namespace: String,
|
||||
host_and_port: HostAndPort,
|
||||
) -> impl Future<Item = (), Error = ()>
|
||||
{
|
||||
// Build up the Controller Client Stack
|
||||
let mut client = {
|
||||
let ctx = ("controller-client", format!("{:?}", host_and_port));
|
||||
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
|
||||
let authority = http::uri::Authority::from(&host_and_port);
|
||||
let connect = Timeout::new(
|
||||
LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()),
|
||||
Duration::from_secs(3),
|
||||
);
|
||||
|
||||
/// Bind this handle to start talking to the controller API.
|
||||
pub fn process<T>(self) -> Process<T>
|
||||
where
|
||||
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
Process {
|
||||
dns_resolver: dns::Resolver::new(self.dns_config),
|
||||
default_destination_namespace: self.default_destination_namespace,
|
||||
destinations: HashMap::new(),
|
||||
reconnects: VecDeque::new(),
|
||||
rpc_ready: false,
|
||||
request_rx: self.request_rx,
|
||||
}
|
||||
}
|
||||
let h2_client = tower_h2::client::Connect::new(
|
||||
connect,
|
||||
h2::client::Builder::default(),
|
||||
::logging::context_executor(ctx, LazyExecutor),
|
||||
);
|
||||
|
||||
let reconnect = Reconnect::new(h2_client);
|
||||
let log_errors = LogErrors::new(reconnect);
|
||||
let backoff = Backoff::new(log_errors, Duration::from_secs(5));
|
||||
// TODO: Use AddOrigin in tower-http
|
||||
AddOrigin::new(scheme, authority, backoff)
|
||||
};
|
||||
|
||||
let mut disco = Background::new(
|
||||
request_rx,
|
||||
dns_resolver,
|
||||
default_destination_namespace,
|
||||
);
|
||||
|
||||
future::poll_fn(move || {
|
||||
disco.poll_rpc(&mut client);
|
||||
|
||||
Ok(Async::NotReady)
|
||||
})
|
||||
}
|
||||
|
||||
// ==== impl Process =====
|
||||
// ==== impl Background =====
|
||||
|
||||
impl<T> Process<T>
|
||||
impl<T> Background<T>
|
||||
where
|
||||
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
pub fn poll_rpc(&mut self, client: &mut T) {
|
||||
fn new(
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
dns_resolver: dns::Resolver,
|
||||
default_destination_namespace: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
dns_resolver,
|
||||
default_destination_namespace,
|
||||
destinations: HashMap::new(),
|
||||
reconnects: VecDeque::new(),
|
||||
rpc_ready: false,
|
||||
request_rx,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_rpc(&mut self, client: &mut T) {
|
||||
// This loop is make sure any streams that were found disconnected
|
||||
// in `poll_destinations` while the `rpc` service is ready should
|
||||
// be reconnected now, otherwise the task would just sleep...
|
||||
|
|
|
@ -28,8 +28,13 @@ use std::collections::HashMap;
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Async, Poll, Stream};
|
||||
use futures::{
|
||||
sync::mpsc,
|
||||
Future,
|
||||
Async,
|
||||
Poll,
|
||||
Stream
|
||||
};
|
||||
use futures_watch::{Store, Watch};
|
||||
use http;
|
||||
use tower_discover::{Change, Discover};
|
||||
|
@ -37,14 +42,14 @@ use tower_service::Service;
|
|||
|
||||
use dns;
|
||||
use telemetry::metrics::DstLabels;
|
||||
use transport::DnsNameAndPort;
|
||||
use transport::{DnsNameAndPort, HostAndPort};
|
||||
|
||||
pub mod background;
|
||||
mod endpoint;
|
||||
|
||||
pub use self::endpoint::{DstLabelsWatch, Endpoint};
|
||||
|
||||
/// A handle to request resolutions from a `Background`.
|
||||
/// A handle to request resolutions from the background discovery task.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Resolver {
|
||||
request_tx: mpsc::UnboundedSender<ResolveRequest>,
|
||||
|
@ -127,17 +132,24 @@ pub trait Bind {
|
|||
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
|
||||
}
|
||||
|
||||
/// Creates a "channel" of `Resolver` to `Background` handles.
|
||||
/// Returns a `Resolver` and a background task future.
|
||||
///
|
||||
/// The `Resolver` is used by a listener, the `Background` is consumed
|
||||
/// on the controller thread.
|
||||
/// The `Resolver` is used by a listener to request resolutions, while
|
||||
/// the background future is executed on the controller thread's executor
|
||||
/// to drive the background task.
|
||||
pub fn new(
|
||||
dns_config: dns::Config,
|
||||
dns_resolver: dns::Resolver,
|
||||
default_destination_namespace: String,
|
||||
) -> (Resolver, background::Config) {
|
||||
host_and_port: HostAndPort,
|
||||
) -> (Resolver, impl Future<Item = (), Error = ()>) {
|
||||
let (request_tx, rx) = mpsc::unbounded();
|
||||
let disco = Resolver { request_tx };
|
||||
let bg = background::Config::new(rx, dns_config, default_destination_namespace);
|
||||
let bg = background::task(
|
||||
rx,
|
||||
dns_resolver,
|
||||
default_destination_namespace,
|
||||
host_and_port,
|
||||
);
|
||||
(disco, bg)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,19 +2,14 @@ use std::fmt;
|
|||
use std::io;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{future, Async, Future, Poll};
|
||||
use h2;
|
||||
use futures::{Async, Future, Poll};
|
||||
use http;
|
||||
use tokio::timer::Delay;
|
||||
use tower_service::Service;
|
||||
use tower_h2;
|
||||
use tower_reconnect::{Error as ReconnectError, Reconnect};
|
||||
use tower_reconnect::{Error as ReconnectError};
|
||||
|
||||
use dns;
|
||||
use task::LazyExecutor;
|
||||
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
|
||||
use timeout::{Timeout, TimeoutError};
|
||||
use timeout::TimeoutError;
|
||||
|
||||
mod cache;
|
||||
pub mod destination;
|
||||
|
@ -23,85 +18,9 @@ mod observe;
|
|||
pub mod pb;
|
||||
mod remote_stream;
|
||||
|
||||
use self::destination::{Resolver, Resolution};
|
||||
pub use self::destination::Bind;
|
||||
pub use self::observe::Observe;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Control {
|
||||
disco: Resolver,
|
||||
}
|
||||
|
||||
pub struct Background {
|
||||
disco: destination::background::Config,
|
||||
}
|
||||
|
||||
pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background)
|
||||
{
|
||||
let (tx, rx) = self::destination::new(dns_config, default_destination_namespace);
|
||||
|
||||
let c = Control {
|
||||
disco: tx,
|
||||
};
|
||||
|
||||
let b = Background {
|
||||
disco: rx,
|
||||
};
|
||||
|
||||
(c, b)
|
||||
}
|
||||
|
||||
// ===== impl Control =====
|
||||
|
||||
impl Control {
|
||||
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Resolution<B> {
|
||||
self.disco.resolve(auth, bind)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Background =====
|
||||
|
||||
impl Background {
|
||||
pub fn bind(
|
||||
self,
|
||||
host_and_port: HostAndPort,
|
||||
dns_config: dns::Config,
|
||||
) -> Box<Future<Item = (), Error = ()>> {
|
||||
// Build up the Controller Client Stack
|
||||
let mut client = {
|
||||
let ctx = ("controller-client", format!("{:?}", host_and_port));
|
||||
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
|
||||
let authority = http::uri::Authority::from(&host_and_port);
|
||||
let dns_resolver = dns::Resolver::new(dns_config);
|
||||
let connect = Timeout::new(
|
||||
LookupAddressAndConnect::new(host_and_port, dns_resolver),
|
||||
Duration::from_secs(3),
|
||||
);
|
||||
|
||||
let h2_client = tower_h2::client::Connect::new(
|
||||
connect,
|
||||
h2::client::Builder::default(),
|
||||
::logging::context_executor(ctx, LazyExecutor),
|
||||
);
|
||||
|
||||
let reconnect = Reconnect::new(h2_client);
|
||||
let log_errors = LogErrors::new(reconnect);
|
||||
let backoff = Backoff::new(log_errors, Duration::from_secs(5));
|
||||
// TODO: Use AddOrigin in tower-http
|
||||
AddOrigin::new(scheme, authority, backoff)
|
||||
};
|
||||
|
||||
let mut disco = self.disco.process();
|
||||
|
||||
let fut = future::poll_fn(move || {
|
||||
disco.poll_rpc(&mut client);
|
||||
|
||||
Ok(Async::NotReady)
|
||||
});
|
||||
|
||||
Box::new(fut)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== Backoff =====
|
||||
|
||||
|
|
|
@ -11,18 +11,13 @@ use trust_dns_resolver::ResolverFuture;
|
|||
use trust_dns_resolver::lookup_ip::LookupIp;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
pub struct Resolver {
|
||||
config: ResolverConfig,
|
||||
opts: ResolverOpts,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Resolver {
|
||||
config: Config,
|
||||
}
|
||||
|
||||
pub enum IpAddrFuture {
|
||||
DNS(Box<Future<Item = LookupIp, Error = ResolveError>>),
|
||||
DNS(Box<Future<Item = LookupIp, Error = ResolveError> + Send>),
|
||||
Fixed(IpAddr),
|
||||
}
|
||||
|
||||
|
@ -37,7 +32,7 @@ pub enum Response {
|
|||
}
|
||||
|
||||
// `Box<Future>` implements `Future` so it doesn't need to be implemented manually.
|
||||
pub type IpAddrListFuture = Box<Future<Item=Response, Error=ResolveError>>;
|
||||
pub type IpAddrListFuture = Box<Future<Item=Response, Error=ResolveError> + Send>;
|
||||
|
||||
/// A DNS name.
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
|
@ -63,23 +58,19 @@ impl AsRef<str> for Name {
|
|||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
impl Resolver {
|
||||
/// TODO: Make this infallible, like it is in the `domain` crate.
|
||||
pub fn from_system_config() -> Result<Self, ResolveError> {
|
||||
let (config, opts) = trust_dns_resolver::system_conf::read_system_conf()?;
|
||||
trace!("DNS config: {:?}", &config);
|
||||
trace!("DNS opts: {:?}", &opts);
|
||||
Ok(Config {
|
||||
config,
|
||||
opts
|
||||
})
|
||||
Ok(Self::new(config, opts))
|
||||
}
|
||||
}
|
||||
|
||||
impl Resolver {
|
||||
pub fn new(config: Config) -> Self {
|
||||
pub fn new(config: ResolverConfig, opts: ResolverOpts) -> Self {
|
||||
Resolver {
|
||||
config,
|
||||
opts,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,8 +117,8 @@ impl Resolver {
|
|||
{
|
||||
let name = name.clone(); // TODO: ref-count names.
|
||||
let resolver = ResolverFuture::new(
|
||||
self.config.config,
|
||||
self.config.opts
|
||||
self.config,
|
||||
self.opts
|
||||
);
|
||||
resolver.and_then(move |r| r.lookup_ip(name.as_str()))
|
||||
}
|
||||
|
|
|
@ -203,19 +203,25 @@ where
|
|||
config.outbound_ports_disable_protocol_detection,
|
||||
);
|
||||
|
||||
let (taps, observe) = control::Observe::new(100);
|
||||
let (sensors, telemetry) = telemetry::new(
|
||||
&process_ctx,
|
||||
config.event_buffer_capacity,
|
||||
config.metrics_retain_idle,
|
||||
&taps,
|
||||
);
|
||||
|
||||
let dns_config = dns::Config::from_system_config()
|
||||
let dns_resolver = dns::Resolver::from_system_config()
|
||||
.unwrap_or_else(|e| {
|
||||
// TODO: Make DNS configuration infallible.
|
||||
panic!("invalid DNS configuration: {:?}", e);
|
||||
});
|
||||
|
||||
let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone());
|
||||
let (control, control_bg) = control::destination::new(
|
||||
dns_resolver.clone(),
|
||||
config.pod_namespace.clone(),
|
||||
control_host_and_port
|
||||
);
|
||||
|
||||
let (drain_tx, drain_rx) = drain::channel();
|
||||
|
||||
|
@ -285,24 +291,14 @@ where
|
|||
let mut rt = current_thread::Runtime::new()
|
||||
.expect("initialize controller-client thread runtime");
|
||||
|
||||
let (taps, observe) = control::Observe::new(100);
|
||||
let new_service = TapServer::new(observe);
|
||||
|
||||
let server = serve_control(control_listener, new_service);
|
||||
|
||||
let telemetry = telemetry
|
||||
.make_control(&taps)
|
||||
.expect("bad news in telemetry town");
|
||||
|
||||
let metrics_server = telemetry
|
||||
.serve_metrics(metrics_listener);
|
||||
|
||||
let client = control_bg.bind(
|
||||
control_host_and_port,
|
||||
dns_config,
|
||||
);
|
||||
|
||||
let fut = client.join4(
|
||||
let fut = control_bg.join4(
|
||||
server.map_err(|_| {}),
|
||||
telemetry,
|
||||
metrics_server.map_err(|_| {}),
|
||||
|
|
|
@ -14,8 +14,7 @@ use tower_h2;
|
|||
use conduit_proxy_router::Recognize;
|
||||
|
||||
use bind::{self, Bind, Protocol};
|
||||
use control;
|
||||
use control::destination::{Bind as BindTrait, Resolution};
|
||||
use control::destination::{self, Bind as BindTrait, Resolution};
|
||||
use ctx;
|
||||
use task::LazyExecutor;
|
||||
use timeout::Timeout;
|
||||
|
@ -27,7 +26,7 @@ type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
|
|||
|
||||
pub struct Outbound<B> {
|
||||
bind: Bind<Arc<ctx::Proxy>, B>,
|
||||
discovery: control::Control,
|
||||
discovery: destination::Resolver,
|
||||
bind_timeout: Duration,
|
||||
}
|
||||
|
||||
|
@ -43,7 +42,7 @@ pub enum Destination {
|
|||
|
||||
impl<B> Outbound<B> {
|
||||
pub fn new(bind: Bind<Arc<ctx::Proxy>, B>,
|
||||
discovery: control::Control,
|
||||
discovery: destination::Resolver,
|
||||
bind_timeout: Duration)
|
||||
-> Outbound<B> {
|
||||
Self {
|
||||
|
|
|
@ -51,50 +51,32 @@ pub struct Control {
|
|||
|
||||
}
|
||||
|
||||
// ===== impl MakeControl =====
|
||||
// ===== impl Control =====
|
||||
|
||||
impl MakeControl {
|
||||
/// Constructs a type that can instantiate a `Control`.
|
||||
impl Control {
|
||||
|
||||
/// Returns a new `Control`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `rx`: the `Receiver` side of the channel on which events are sent.
|
||||
/// - `process_ctx`: runtime process metadata.
|
||||
/// - `taps`: shares a `Taps` instance.
|
||||
pub(super) fn new(
|
||||
rx: Receiver<Event>,
|
||||
process_ctx: &Arc<ctx::Process>,
|
||||
metrics_retain_idle: Duration,
|
||||
taps: &Arc<Mutex<Taps>>
|
||||
) -> Self {
|
||||
let (metrics_record, metrics_service) =
|
||||
metrics::new(&process_ctx, metrics_retain_idle);
|
||||
Self {
|
||||
rx,
|
||||
process_ctx: Arc::clone(process_ctx),
|
||||
metrics_retain_idle,
|
||||
metrics_record,
|
||||
metrics_service,
|
||||
rx: Some(rx),
|
||||
taps: Some(taps.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind a `Control` with the current task executor.
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `taps`: shares a `Taps` instance.
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Ok(())` if the timeout was successfully created.
|
||||
/// - `Err(io::Error)` if the timeout could not be created.
|
||||
pub fn make_control(self, taps: &Arc<Mutex<Taps>>) -> io::Result<Control> {
|
||||
let (metrics_record, metrics_service) =
|
||||
metrics::new(&self.process_ctx, self.metrics_retain_idle);
|
||||
|
||||
Ok(Control {
|
||||
metrics_record,
|
||||
metrics_service,
|
||||
rx: Some(self.rx),
|
||||
taps: Some(taps.clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Control =====
|
||||
|
||||
impl Control {
|
||||
fn recv(&mut self) -> Poll<Option<Event>, ()> {
|
||||
match self.rx.take() {
|
||||
None => Ok(Async::Ready(None)),
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_mpsc_lossy;
|
||||
|
@ -32,9 +32,10 @@ pub fn new(
|
|||
process: &Arc<ctx::Process>,
|
||||
capacity: usize,
|
||||
metrics_retain_idle: Duration,
|
||||
) -> (Sensors, MakeControl) {
|
||||
taps: &Arc<Mutex<tap::Taps>>,
|
||||
) -> (Sensors, Control) {
|
||||
let (tx, rx) = futures_mpsc_lossy::channel(capacity);
|
||||
let s = Sensors::new(tx);
|
||||
let c = MakeControl::new(rx, process, metrics_retain_idle);
|
||||
let c = Control::new(rx, process, metrics_retain_idle, taps);
|
||||
(s, c)
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ impl LookupAddressAndConnect {
|
|||
impl tokio_connect::Connect for LookupAddressAndConnect {
|
||||
type Connected = connection::Connection;
|
||||
type Error = io::Error;
|
||||
type Future = Box<Future<Item = connection::Connection, Error = io::Error>>;
|
||||
type Future = Box<Future<Item = connection::Connection, Error = io::Error> + Send>;
|
||||
|
||||
fn connect(&self) -> Self::Future {
|
||||
let port = self.host_and_port.port;
|
||||
|
|
Loading…
Reference in New Issue