Proxy: Refactor poll_destination() in service discovery. (#674)

No change in behavior is intended here.

Split poll_destination() into two parts, one that operates locally
on the DestinationSet, and the other that operates on data that isn't
wholly local to the DestinationSet. This makes the code easier to
understand. This is being done in preparation for adding DNS fallback
polling to poll_destination().

Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
Brian Smith 2018-04-05 13:05:11 -10:00 committed by GitHub
parent 311ef410a8
commit 4fb9877b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 51 additions and 44 deletions

View File

@ -322,52 +322,17 @@ where
fn poll_destinations(&mut self) {
for (auth, set) in &mut self.destinations {
let needs_reconnect = 'set: loop {
let poll_result = match set.query {
None |
Some(DestinationServiceQuery::NeedsReconnect) => {
continue;
},
Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => {
rx.poll()
}
};
match poll_result {
Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) =>
set.add(
auth,
a_set.addrs.iter().filter_map(
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
Some(PbUpdate2::Remove(r_set)) =>
set.remove(
auth,
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
set.no_endpoints(auth, no_endpoints.exists),
None => (),
},
Ok(Async::Ready(None)) => {
trace!(
"Destination.Get stream ended for {:?}, must reconnect",
auth
);
break 'set true;
}
Ok(Async::NotReady) => break 'set false,
Err(err) => {
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
break 'set true;
}
}
};
if needs_reconnect {
set.query = Some(DestinationServiceQuery::NeedsReconnect);
set.query = match set.query.take() {
Some(DestinationServiceQuery::ConnectedOrConnecting{ rx }) => {
let new_query = set.poll_destination(auth, rx);
if let DestinationServiceQuery::NeedsReconnect = new_query {
set.reset_on_next_modification();
self.reconnects.push_back(auth.clone());
}
Some(new_query)
},
query => query,
};
}
}
}
@ -403,6 +368,48 @@ impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> Destination
// ===== impl DestinationSet =====
impl<T> DestinationSet<T>
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
T::Error: fmt::Debug
{
fn poll_destination(&mut self, auth: &DnsNameAndPort, mut rx: UpdateRx<T>)
-> DestinationServiceQuery<T>
{
loop {
match rx.poll() {
Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) =>
self.add(
auth,
a_set.addrs.iter().filter_map(
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
Some(PbUpdate2::Remove(r_set)) =>
self.remove(
auth,
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
self.no_endpoints(auth, no_endpoints.exists),
None => (),
},
Ok(Async::Ready(None)) => {
trace!(
"Destination.Get stream ended for {:?}, must reconnect",
auth
);
return DestinationServiceQuery::NeedsReconnect;
},
Ok(Async::NotReady) => {
return DestinationServiceQuery::ConnectedOrConnecting { rx };
},
Err(err) => {
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
return DestinationServiceQuery::NeedsReconnect;
}
};
}
}
}
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
fn reset_on_next_modification(&mut self) {
match self.addrs {