Refactor poll_destination() in service discovery. (#725)

No change in behavior is intended here.

Split poll_destination() into two parts, one that operates locally
on the DestinationSet, and the other that operates on data that isn't
wholly local to the DestinationSet. This makes the code easier to
understand. This is being done in preparation for adding DNS fallback
polling to poll_destination().

Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
Brian Smith 2018-04-07 18:15:19 -10:00 committed by GitHub
parent a88479fd8f
commit 91816fcc8c
1 changed files with 51 additions and 44 deletions

View File

@ -325,52 +325,17 @@ where
fn poll_destinations(&mut self) {
for (auth, set) in &mut self.destinations {
let needs_reconnect = 'set: loop {
let poll_result = match set.query {
None |
Some(DestinationServiceQuery::NeedsReconnect) => {
continue;
},
Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => {
rx.poll()
set.query = match set.query.take() {
Some(DestinationServiceQuery::ConnectedOrConnecting{ rx }) => {
let new_query = set.poll_destination(auth, rx);
if let DestinationServiceQuery::NeedsReconnect = new_query {
set.reset_on_next_modification();
self.reconnects.push_back(auth.clone());
}
};
match poll_result {
Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) =>
set.add(
auth,
a_set.addrs.iter().filter_map(
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
Some(PbUpdate2::Remove(r_set)) =>
set.remove(
auth,
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
set.no_endpoints(auth, no_endpoints.exists),
None => (),
},
Ok(Async::Ready(None)) => {
trace!(
"Destination.Get stream ended for {:?}, must reconnect",
auth
);
break 'set true;
}
Ok(Async::NotReady) => break 'set false,
Err(err) => {
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
break 'set true;
}
}
Some(new_query)
},
query => query,
};
if needs_reconnect {
set.query = Some(DestinationServiceQuery::NeedsReconnect);
set.reset_on_next_modification();
self.reconnects.push_back(auth.clone());
}
}
}
}
@ -406,6 +371,48 @@ impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> Destination
// ===== impl DestinationSet =====
impl<T> DestinationSet<T>
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
T::Error: fmt::Debug
{
fn poll_destination(&mut self, auth: &DnsNameAndPort, mut rx: UpdateRx<T>)
-> DestinationServiceQuery<T>
{
loop {
match rx.poll() {
Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) =>
self.add(
auth,
a_set.addrs.iter().filter_map(
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
Some(PbUpdate2::Remove(r_set)) =>
self.remove(
auth,
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
self.no_endpoints(auth, no_endpoints.exists),
None => (),
},
Ok(Async::Ready(None)) => {
trace!(
"Destination.Get stream ended for {:?}, must reconnect",
auth
);
return DestinationServiceQuery::NeedsReconnect;
},
Ok(Async::NotReady) => {
return DestinationServiceQuery::ConnectedOrConnecting { rx };
},
Err(err) => {
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
return DestinationServiceQuery::NeedsReconnect;
}
};
}
}
}
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
fn reset_on_next_modification(&mut self) {
match self.addrs {