mirror of https://github.com/linkerd/linkerd2.git
This reverts commit 4fb9877b89
.
Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
2f5b5ea5f2
commit
1b223723bc
|
@ -322,17 +322,52 @@ where
|
|||
|
||||
fn poll_destinations(&mut self) {
|
||||
for (auth, set) in &mut self.destinations {
|
||||
set.query = match set.query.take() {
|
||||
Some(DestinationServiceQuery::ConnectedOrConnecting{ rx }) => {
|
||||
let new_query = set.poll_destination(auth, rx);
|
||||
if let DestinationServiceQuery::NeedsReconnect = new_query {
|
||||
set.reset_on_next_modification();
|
||||
self.reconnects.push_back(auth.clone());
|
||||
let needs_reconnect = 'set: loop {
|
||||
let poll_result = match set.query {
|
||||
None |
|
||||
Some(DestinationServiceQuery::NeedsReconnect) => {
|
||||
continue;
|
||||
},
|
||||
Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => {
|
||||
rx.poll()
|
||||
}
|
||||
Some(new_query)
|
||||
},
|
||||
query => query,
|
||||
};
|
||||
|
||||
match poll_result {
|
||||
Ok(Async::Ready(Some(update))) => match update.update {
|
||||
Some(PbUpdate2::Add(a_set)) =>
|
||||
set.add(
|
||||
auth,
|
||||
a_set.addrs.iter().filter_map(
|
||||
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
|
||||
Some(PbUpdate2::Remove(r_set)) =>
|
||||
set.remove(
|
||||
auth,
|
||||
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
|
||||
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
|
||||
set.no_endpoints(auth, no_endpoints.exists),
|
||||
None => (),
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
trace!(
|
||||
"Destination.Get stream ended for {:?}, must reconnect",
|
||||
auth
|
||||
);
|
||||
break 'set true;
|
||||
}
|
||||
Ok(Async::NotReady) => break 'set false,
|
||||
Err(err) => {
|
||||
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
|
||||
break 'set true;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
if needs_reconnect {
|
||||
set.query = Some(DestinationServiceQuery::NeedsReconnect);
|
||||
set.reset_on_next_modification();
|
||||
self.reconnects.push_back(auth.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -368,48 +403,6 @@ impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> Destination
|
|||
|
||||
// ===== impl DestinationSet =====
|
||||
|
||||
impl<T> DestinationSet<T>
|
||||
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug
|
||||
{
|
||||
fn poll_destination(&mut self, auth: &DnsNameAndPort, mut rx: UpdateRx<T>)
|
||||
-> DestinationServiceQuery<T>
|
||||
{
|
||||
loop {
|
||||
match rx.poll() {
|
||||
Ok(Async::Ready(Some(update))) => match update.update {
|
||||
Some(PbUpdate2::Add(a_set)) =>
|
||||
self.add(
|
||||
auth,
|
||||
a_set.addrs.iter().filter_map(
|
||||
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
|
||||
Some(PbUpdate2::Remove(r_set)) =>
|
||||
self.remove(
|
||||
auth,
|
||||
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
|
||||
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
|
||||
self.no_endpoints(auth, no_endpoints.exists),
|
||||
None => (),
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
trace!(
|
||||
"Destination.Get stream ended for {:?}, must reconnect",
|
||||
auth
|
||||
);
|
||||
return DestinationServiceQuery::NeedsReconnect;
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
return DestinationServiceQuery::ConnectedOrConnecting { rx };
|
||||
},
|
||||
Err(err) => {
|
||||
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
|
||||
return DestinationServiceQuery::NeedsReconnect;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
||||
fn reset_on_next_modification(&mut self) {
|
||||
match self.addrs {
|
||||
|
|
Loading…
Reference in New Issue