diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 71a73e344..8af0df5d6 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -322,17 +322,52 @@ where fn poll_destinations(&mut self) { for (auth, set) in &mut self.destinations { - set.query = match set.query.take() { - Some(DestinationServiceQuery::ConnectedOrConnecting{ rx }) => { - let new_query = set.poll_destination(auth, rx); - if let DestinationServiceQuery::NeedsReconnect = new_query { - set.reset_on_next_modification(); - self.reconnects.push_back(auth.clone()); + let needs_reconnect = 'set: loop { + let poll_result = match set.query { + None | + Some(DestinationServiceQuery::NeedsReconnect) => { + continue; + }, + Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => { + rx.poll() } - Some(new_query) - }, - query => query, + }; + + match poll_result { + Ok(Async::Ready(Some(update))) => match update.update { + Some(PbUpdate2::Add(a_set)) => + set.add( + auth, + a_set.addrs.iter().filter_map( + |addr| addr.addr.clone().and_then(pb_to_sock_addr))), + Some(PbUpdate2::Remove(r_set)) => + set.remove( + auth, + r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))), + Some(PbUpdate2::NoEndpoints(no_endpoints)) => + set.no_endpoints(auth, no_endpoints.exists), + None => (), + }, + Ok(Async::Ready(None)) => { + trace!( + "Destination.Get stream ended for {:?}, must reconnect", + auth + ); + break 'set true; + } + Ok(Async::NotReady) => break 'set false, + Err(err) => { + warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); + break 'set true; + } + } + }; + if needs_reconnect { + set.query = Some(DestinationServiceQuery::NeedsReconnect); + set.reset_on_next_modification(); + self.reconnects.push_back(auth.clone()); + } } } } @@ -368,48 +403,6 @@ impl> Destination // ===== impl DestinationSet ===== -impl DestinationSet - where T: HttpService, - T::Error: fmt::Debug -{ - fn poll_destination(&mut self, auth: &DnsNameAndPort, mut rx: UpdateRx) - -> DestinationServiceQuery - { - loop { - match rx.poll() { - Ok(Async::Ready(Some(update))) => match update.update { - Some(PbUpdate2::Add(a_set)) => - self.add( - auth, - a_set.addrs.iter().filter_map( - |addr| addr.addr.clone().and_then(pb_to_sock_addr))), - Some(PbUpdate2::Remove(r_set)) => - self.remove( - auth, - r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))), - Some(PbUpdate2::NoEndpoints(no_endpoints)) => - self.no_endpoints(auth, no_endpoints.exists), - None => (), - }, - Ok(Async::Ready(None)) => { - trace!( - "Destination.Get stream ended for {:?}, must reconnect", - auth - ); - return DestinationServiceQuery::NeedsReconnect; - }, - Ok(Async::NotReady) => { - return DestinationServiceQuery::ConnectedOrConnecting { rx }; - }, - Err(err) => { - warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); - return DestinationServiceQuery::NeedsReconnect; - } - }; - } - } -} - impl > DestinationSet { fn reset_on_next_modification(&mut self) { match self.addrs {