diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 4ea5f5a2d..49d050377 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -56,8 +56,7 @@ pub struct DiscoveryWork> { struct DestinationSet> { addrs: Exists>, - needs_reconnect: bool, - rx: UpdateRx, + query: DestinationServiceQuery, txs: Vec>, } @@ -73,6 +72,13 @@ impl Exists { } } +enum DestinationServiceQuery> { + NeedsReconnect, + ConnectedOrConnecting { + rx: UpdateRx + }, +} + /// Receiver for destination set updates. /// /// The destination RPC returns a `ResponseFuture` whose item is a @@ -280,20 +286,11 @@ where set.txs.push(tx); } Entry::Vacant(vac) => { - let req = Destination { - scheme: "k8s".into(), - path: vac.key().without_trailing_dot() - .as_str().into(), - }; - // TODO: Can grpc::Request::new be removed? - let mut svc = DestinationSvc::new(client.lift_ref()); - let response = svc.get(grpc::Request::new(req)); - let stream = UpdateRx::Waiting(response); - + let query = + DestinationServiceQuery::connect(client, vac.key(), "connect"); vac.insert(DestinationSet { addrs: Exists::Unknown, - needs_reconnect: false, - rx: stream, + query, txs: vec![tx], }); } @@ -315,15 +312,7 @@ where while let Some(auth) = self.reconnects.pop_front() { if let Some(set) = self.destinations.get_mut(&auth) { - trace!("Destination.Get reconnect {:?}", auth); - let req = Destination { - scheme: "k8s".into(), - path: auth.without_trailing_dot().as_str().into(), - }; - let mut svc = DestinationSvc::new(client.lift_ref()); - let response = svc.get(grpc::Request::new(req)); - set.rx = UpdateRx::Waiting(response); - set.needs_reconnect = false; + set.query = DestinationServiceQuery::connect(client, &auth, "reconnect"); return true; } else { trace!("reconnect no longer needed: {:?}", auth); @@ -334,12 +323,17 @@ where fn poll_destinations(&mut self) { for (auth, set) in &mut self.destinations { - if set.needs_reconnect { - continue; - } let needs_reconnect = 'set: loop { + let poll_result = match set.query { + DestinationServiceQuery::NeedsReconnect => { + continue; + }, + DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx } => { + rx.poll() + } + }; - match set.rx.poll() { + match poll_result { Ok(Async::Ready(Some(update))) => match update.update { Some(PbUpdate2::Add(a_set)) => set.add( @@ -370,13 +364,30 @@ where }; if needs_reconnect { - set.needs_reconnect = true; + set.query = DestinationServiceQuery::NeedsReconnect; self.reconnects.push_back(FullyQualifiedAuthority::clone(auth)); } } } } + +// ===== impl DestinationServiceQuery ===== + +impl> DestinationServiceQuery { + fn connect(client: &mut T, auth: &FullyQualifiedAuthority, connect_or_reconnect: &str) -> Self { + trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth); + let req = Destination { + scheme: "k8s".into(), + path: auth.without_trailing_dot().as_str().into(), + }; + // TODO: Can grpc::Request::new be removed? + let mut svc = DestinationSvc::new(client.lift_ref()); + let response = svc.get(grpc::Request::new(req)); + DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) } + } +} + // ===== impl DestinationSet ===== impl > DestinationSet {