From 2fab8895dc47bd032d1b0d445c859edd0b0bcd21 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Sat, 25 Feb 2017 14:12:17 -0800 Subject: [PATCH] grpclb: fix a bug in handling server address updates. It didn't check if the address was already there in the current subchannel set before creating a new subchannel for it, causing a leak of subchannels. --- .../io/grpc/grpclb/GrpclbLoadBalancer.java | 17 ++++++----- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 28 +++++++++++++------ 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 63fa54baae..cf20e9f8c7 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -356,13 +356,16 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { EquivalentAddressGroup eag = new EquivalentAddressGroup(address); // TODO(zhangkun83): save the LB token and insert it to the application RPCs' headers. if (!newSubchannelMap.containsKey(eag)) { - Attributes subchannelAttrs = Attributes.newBuilder() - .set(STATE_INFO, - new AtomicReference( - ConnectivityStateInfo.forNonError(IDLE))) - .build(); - Subchannel subchannel = helper.createSubchannel(eag, subchannelAttrs); - subchannel.requestConnection(); + Subchannel subchannel = subchannels.get(eag); + if (subchannel == null) { + Attributes subchannelAttrs = Attributes.newBuilder() + .set(STATE_INFO, + new AtomicReference( + ConnectivityStateInfo.forNonError(IDLE))) + .build(); + subchannel = helper.createSubchannel(eag, subchannelAttrs); + subchannel.requestConnection(); + } newSubchannelMap.put(eag, subchannel); } newRoundRobinList.add(eag); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index a99e0db26d..be5e6e4579 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -601,15 +601,20 @@ public class GrpclbLoadBalancerTest { assertSame(error1, picker6.result.getStatus()); // Update backends, with a drop entry - List backends2 = Arrays.asList( - new InetSocketAddress("127.0.0.1", 2030), null); + List backends2 = + Arrays.asList( + new InetSocketAddress("127.0.0.1", 2030), // New address + null, // drop + new InetSocketAddress("127.0.0.1", 2010), // Existing address + new InetSocketAddress("127.0.0.1", 2030)); // New address appearing second time verify(subchannel1, never()).shutdown(); - verify(subchannel2, never()).shutdown(); lbResponseObserver.onNext(buildLbResponse(backends2)); - verify(subchannel1).shutdown(); - verify(subchannel2).shutdown(); + verify(subchannel1).shutdown(); // not in backends2, closed + verify(subchannel2, never()).shutdown(); // backends2[2], will be kept + inOrder.verify(helper, never()).createSubchannel( + eq(new EquivalentAddressGroup(backends2.get(2))), any(Attributes.class)); inOrder.verify(helper).createSubchannel( eq(new EquivalentAddressGroup(backends2.get(0))), any(Attributes.class)); assertEquals(1, mockSubchannels.size()); @@ -620,16 +625,23 @@ public class GrpclbLoadBalancerTest { RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); assertRoundRobinList(picker7, (Subchannel) null); - // State updates on obsolete subchannels will have no effect + // State updates on obsolete subchannel1 will have no effect deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState( + subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); - deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(SHUTDOWN)); inOrder.verifyNoMoreInteractions(); deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updatePicker(pickerCaptor.capture()); RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); - assertRoundRobinList(picker8, subchannel3, null); + // subchannel2 is still IDLE, thus not in the active list + assertRoundRobinList(picker8, subchannel3, null, subchannel3); + // subchannel2 becomes READY and makes it into the list + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker9, subchannel3, null, subchannel2, subchannel3); verify(subchannel3, never()).shutdown(); assertFalse(oobChannel.isShutdown());