Use lbTransport if the LB request got a UNIMPLEMENTED response

This commit is contained in:
Kun Zhang 2015-12-18 17:35:20 -08:00
parent 707302d76e
commit 59c1a6ae26
2 changed files with 90 additions and 13 deletions

View File

@ -94,6 +94,8 @@ class GrpclbLoadBalancer extends LoadBalancer {
@GuardedBy("lock")
private ClientTransport lbTransport;
@GuardedBy("lock")
private ListenableFuture<ClientTransport> directTransport;
@GuardedBy("lock")
private StreamObserver<LoadBalanceResponse> lbResponseObserver;
@GuardedBy("lock")
private StreamObserver<LoadBalanceRequest> lbRequestWriter;
@ -134,6 +136,9 @@ class GrpclbLoadBalancer extends LoadBalancer {
RoundRobinServerList serverListCopy;
synchronized (lock) {
Preconditions.checkState(!closed, "already closed");
if (directTransport != null) {
return directTransport;
}
if (roundRobinServerList == null) {
if (lastError == null) {
return pendingPicks.newBlankFuture();
@ -165,12 +170,14 @@ class GrpclbLoadBalancer extends LoadBalancer {
@GuardedBy("lock")
private void connectToLb() {
directTransport = null;
if (closed) {
return;
}
lbResponseObserver = null;
// TODO(zhangkun83): should use a separate authority for LB servers
Preconditions.checkNotNull(lbAddresses, "lbAddresses");
// TODO(zhangkun83): LB servers may use an authority different from the service's.
// getTransport() will need to add an argument for the authority.
ListenableFuture<ClientTransport> transportFuture = tm.getTransport(lbAddresses);
Futures.addCallback(
Preconditions.checkNotNull(transportFuture),
@ -340,14 +347,33 @@ class GrpclbLoadBalancer extends LoadBalancer {
}
private void onStreamClosed(Status status) {
handleError(status);
synchronized (this) {
if (lbResponseObserver == this) {
if (status.getCode() == Status.Code.UNIMPLEMENTED) {
// TODO(zhangkun83): maybe we can instead begin sending normal RPCs to this server, but
// it's not decided yet.
lbTransport.shutdown();
FulfillmentBatch<ClientTransport> pendingPicksFulfillmentBatch;
final ListenableFuture<ClientTransport> transportFuture;
// This LB transport doesn't seem to be an actual LB server, if the LB address comes
// directly from NameResolver, just use it to serve normal RPCs.
// TODO(zhangkun83): check if lbAddresses are from NameResolver after we start getting
// lbAddresses from LoadBalanceResponse.
synchronized (lock) {
if (lbResponseObserver != this) {
return;
}
directTransport = transportFuture = Futures.immediateFuture(lbTransport);
pendingPicksFulfillmentBatch = pendingPicks.createFulfillmentBatch();
}
pendingPicksFulfillmentBatch.link(
new Supplier<ListenableFuture<ClientTransport>>() {
@Override
public ListenableFuture<ClientTransport> get() {
return transportFuture;
}
});
} else {
handleError(status);
synchronized (lock) {
if (lbResponseObserver != this) {
return;
}
// TODO(zhangkun83): apply back-off, otherwise this will spam the server continually
// with requests if the server tends to fail it for any reason.
// I am still the active LB stream. Reopen the stream.
@ -357,4 +383,3 @@ class GrpclbLoadBalancer extends LoadBalancer {
}
}
}
}

View File

@ -299,6 +299,58 @@ public class GrpclbLoadBalancerTest {
assertNull(loadBalancer.getRoundRobinServerList());
}
@Test public void lbStreamUnimplemented() throws Exception {
// Simulate the initial set of LB addresses resolved
simulateLbAddressResolved(30001);
// First pick, will be pending
ListenableFuture<ClientTransport> pick = loadBalancer.pickTransport(null);
// Make the transport for LB server ready
ClientTransport lbTransport = mock(ClientTransport.class);
lbTransportFuture.set(lbTransport);
// An LB request is sent
SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
assertNotNull(sentLbRequest);
assertSame(lbTransport, sentLbRequest.transport);
// Simulate that the LB stream fails with UNIMPLEMENTED
loadBalancer.getLbResponseObserver().onError(Status.UNIMPLEMENTED.asException());
// The pending pick will succeed with lbTransport
assertTrue(pick.isDone());
assertSame(lbTransport, pick.get());
// Subsequent picks will also get lbTransport
pick = loadBalancer.pickTransport(null);
assertTrue(pick.isDone());
assertSame(lbTransport, pick.get());
// Round-robin list NOT available at this point
assertNull(loadBalancer.getRoundRobinServerList());
verify(mockTransportManager, times(1)).getTransport(eq(lbAddressGroup));
// Didn't send additional requests other than the initial one
assertEquals(0, loadBalancer.sentLbRequests.size());
// Shut down the transport
loadBalancer.transportShutdown(lbAddressGroup, lbTransport,
Status.UNAVAILABLE.withDescription("simulated"));
// Subsequent pick will fail because an error has occurred
pick = loadBalancer.pickTransport(null);
assertTrue(pick.isDone());
assertFutureFailedWithError(pick, Status.Code.UNAVAILABLE,
"simulated", "Transport to LB server closed");
// Will get another lbTransport, and send another LB request
verify(mockTransportManager, times(2)).getTransport(eq(lbAddressGroup));
lbTransportFuture.set(lbTransport);
assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
}
@Test public void lbConnectionClosedAfterResponse() throws Exception {
// Simulate the initial set of LB addresses resolved
simulateLbAddressResolved(30001);