From 59c1a6ae26aaef2efcd7071562689aee0bd1b933 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 18 Dec 2015 17:35:20 -0800 Subject: [PATCH] Use lbTransport if the LB request got a UNIMPLEMENTED response --- .../io/grpc/grpclb/GrpclbLoadBalancer.java | 51 +++++++++++++----- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 52 +++++++++++++++++++ 2 files changed, 90 insertions(+), 13 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 839bfd88f2..cbbdf658c9 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -94,6 +94,8 @@ class GrpclbLoadBalancer extends LoadBalancer { @GuardedBy("lock") private ClientTransport lbTransport; @GuardedBy("lock") + private ListenableFuture directTransport; + @GuardedBy("lock") private StreamObserver lbResponseObserver; @GuardedBy("lock") private StreamObserver lbRequestWriter; @@ -134,6 +136,9 @@ class GrpclbLoadBalancer extends LoadBalancer { RoundRobinServerList serverListCopy; synchronized (lock) { Preconditions.checkState(!closed, "already closed"); + if (directTransport != null) { + return directTransport; + } if (roundRobinServerList == null) { if (lastError == null) { return pendingPicks.newBlankFuture(); @@ -165,12 +170,14 @@ class GrpclbLoadBalancer extends LoadBalancer { @GuardedBy("lock") private void connectToLb() { + directTransport = null; if (closed) { return; } lbResponseObserver = null; - // TODO(zhangkun83): should use a separate authority for LB servers Preconditions.checkNotNull(lbAddresses, "lbAddresses"); + // TODO(zhangkun83): LB servers may use an authority different from the service's. + // getTransport() will need to add an argument for the authority. ListenableFuture transportFuture = tm.getTransport(lbAddresses); Futures.addCallback( Preconditions.checkNotNull(transportFuture), @@ -340,19 +347,37 @@ class GrpclbLoadBalancer extends LoadBalancer { } private void onStreamClosed(Status status) { - handleError(status); - synchronized (this) { - if (lbResponseObserver == this) { - if (status.getCode() == Status.Code.UNIMPLEMENTED) { - // TODO(zhangkun83): maybe we can instead begin sending normal RPCs to this server, but - // it's not decided yet. - lbTransport.shutdown(); - } else { - // TODO(zhangkun83): apply back-off, otherwise this will spam the server continually - // with requests if the server tends to fail it for any reason. - // I am still the active LB stream. Reopen the stream. - startNegotiation(); + if (status.getCode() == Status.Code.UNIMPLEMENTED) { + FulfillmentBatch pendingPicksFulfillmentBatch; + final ListenableFuture transportFuture; + // This LB transport doesn't seem to be an actual LB server, if the LB address comes + // directly from NameResolver, just use it to serve normal RPCs. + // TODO(zhangkun83): check if lbAddresses are from NameResolver after we start getting + // lbAddresses from LoadBalanceResponse. + synchronized (lock) { + if (lbResponseObserver != this) { + return; } + directTransport = transportFuture = Futures.immediateFuture(lbTransport); + pendingPicksFulfillmentBatch = pendingPicks.createFulfillmentBatch(); + } + pendingPicksFulfillmentBatch.link( + new Supplier>() { + @Override + public ListenableFuture get() { + return transportFuture; + } + }); + } else { + handleError(status); + synchronized (lock) { + if (lbResponseObserver != this) { + return; + } + // TODO(zhangkun83): apply back-off, otherwise this will spam the server continually + // with requests if the server tends to fail it for any reason. + // I am still the active LB stream. Reopen the stream. + startNegotiation(); } } } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index f4406f686f..ef5e2dc0fb 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -299,6 +299,58 @@ public class GrpclbLoadBalancerTest { assertNull(loadBalancer.getRoundRobinServerList()); } + @Test public void lbStreamUnimplemented() throws Exception { + // Simulate the initial set of LB addresses resolved + simulateLbAddressResolved(30001); + + // First pick, will be pending + ListenableFuture pick = loadBalancer.pickTransport(null); + + // Make the transport for LB server ready + ClientTransport lbTransport = mock(ClientTransport.class); + lbTransportFuture.set(lbTransport); + + // An LB request is sent + SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS); + assertNotNull(sentLbRequest); + assertSame(lbTransport, sentLbRequest.transport); + + // Simulate that the LB stream fails with UNIMPLEMENTED + loadBalancer.getLbResponseObserver().onError(Status.UNIMPLEMENTED.asException()); + + // The pending pick will succeed with lbTransport + assertTrue(pick.isDone()); + assertSame(lbTransport, pick.get()); + + // Subsequent picks will also get lbTransport + pick = loadBalancer.pickTransport(null); + assertTrue(pick.isDone()); + assertSame(lbTransport, pick.get()); + + // Round-robin list NOT available at this point + assertNull(loadBalancer.getRoundRobinServerList()); + + verify(mockTransportManager, times(1)).getTransport(eq(lbAddressGroup)); + + // Didn't send additional requests other than the initial one + assertEquals(0, loadBalancer.sentLbRequests.size()); + + // Shut down the transport + loadBalancer.transportShutdown(lbAddressGroup, lbTransport, + Status.UNAVAILABLE.withDescription("simulated")); + + // Subsequent pick will fail because an error has occurred + pick = loadBalancer.pickTransport(null); + assertTrue(pick.isDone()); + assertFutureFailedWithError(pick, Status.Code.UNAVAILABLE, + "simulated", "Transport to LB server closed"); + + // Will get another lbTransport, and send another LB request + verify(mockTransportManager, times(2)).getTransport(eq(lbAddressGroup)); + lbTransportFuture.set(lbTransport); + assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS)); + } + @Test public void lbConnectionClosedAfterResponse() throws Exception { // Simulate the initial set of LB addresses resolved simulateLbAddressResolved(30001);