Proxy: Factor out Destination service connection logic (#631)

* Proxy: Factor out Destination service connection logic

Centralize the connection initiation logic for the Destination service
to make it easier to maintain. Clarify that the `rx` field isn't needed
prior to a (re)connect.

Signed-off-by: Brian Smith <brian@briansmith.org>

* Rename `rx` to `query`.

Signed-off-by: Brian Smith <brian@briansmith.org>

* "recoonect" -> "reconnect"

Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
Brian Smith 2018-03-29 08:20:57 -10:00 committed by GitHub
parent 666c83e963
commit bae72c32ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 39 additions and 28 deletions

View File

@ -56,8 +56,7 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
addrs: Exists<HashSet<SocketAddr>>,
needs_reconnect: bool,
rx: UpdateRx<T>,
query: DestinationServiceQuery<T>,
txs: Vec<mpsc::UnboundedSender<Update>>,
}
@ -73,6 +72,13 @@ impl<T> Exists<T> {
}
}
enum DestinationServiceQuery<T: HttpService<ResponseBody = RecvBody>> {
NeedsReconnect,
ConnectedOrConnecting {
rx: UpdateRx<T>
},
}
/// Receiver for destination set updates.
///
/// The destination RPC returns a `ResponseFuture` whose item is a
@ -280,20 +286,11 @@ where
set.txs.push(tx);
}
Entry::Vacant(vac) => {
let req = Destination {
scheme: "k8s".into(),
path: vac.key().without_trailing_dot()
.as_str().into(),
};
// TODO: Can grpc::Request::new be removed?
let mut svc = DestinationSvc::new(client.lift_ref());
let response = svc.get(grpc::Request::new(req));
let stream = UpdateRx::Waiting(response);
let query =
DestinationServiceQuery::connect(client, vac.key(), "connect");
vac.insert(DestinationSet {
addrs: Exists::Unknown,
needs_reconnect: false,
rx: stream,
query,
txs: vec![tx],
});
}
@ -315,15 +312,7 @@ where
while let Some(auth) = self.reconnects.pop_front() {
if let Some(set) = self.destinations.get_mut(&auth) {
trace!("Destination.Get reconnect {:?}", auth);
let req = Destination {
scheme: "k8s".into(),
path: auth.without_trailing_dot().as_str().into(),
};
let mut svc = DestinationSvc::new(client.lift_ref());
let response = svc.get(grpc::Request::new(req));
set.rx = UpdateRx::Waiting(response);
set.needs_reconnect = false;
set.query = DestinationServiceQuery::connect(client, &auth, "reconnect");
return true;
} else {
trace!("reconnect no longer needed: {:?}", auth);
@ -334,12 +323,17 @@ where
fn poll_destinations(&mut self) {
for (auth, set) in &mut self.destinations {
if set.needs_reconnect {
continue;
}
let needs_reconnect = 'set: loop {
let poll_result = match set.query {
DestinationServiceQuery::NeedsReconnect => {
continue;
},
DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx } => {
rx.poll()
}
};
match set.rx.poll() {
match poll_result {
Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) =>
set.add(
@ -370,13 +364,30 @@ where
};
if needs_reconnect {
set.needs_reconnect = true;
set.query = DestinationServiceQuery::NeedsReconnect;
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
}
}
}
}
// ===== impl DestinationServiceQuery =====
impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> DestinationServiceQuery<T> {
fn connect(client: &mut T, auth: &FullyQualifiedAuthority, connect_or_reconnect: &str) -> Self {
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth);
let req = Destination {
scheme: "k8s".into(),
path: auth.without_trailing_dot().as_str().into(),
};
// TODO: Can grpc::Request::new be removed?
let mut svc = DestinationSvc::new(client.lift_ref());
let response = svc.get(grpc::Request::new(req));
DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) }
}
}
// ===== impl DestinationSet =====
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {