mirror of https://github.com/grpc/grpc-java.git
grpclb: fix a bug in handling server address updates.
It didn't check if the address was already there in the current subchannel set before creating a new subchannel for it, causing a leak of subchannels.
This commit is contained in:
parent
e345273268
commit
2fab8895dc
|
|
@ -356,13 +356,16 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
||||||
EquivalentAddressGroup eag = new EquivalentAddressGroup(address);
|
EquivalentAddressGroup eag = new EquivalentAddressGroup(address);
|
||||||
// TODO(zhangkun83): save the LB token and insert it to the application RPCs' headers.
|
// TODO(zhangkun83): save the LB token and insert it to the application RPCs' headers.
|
||||||
if (!newSubchannelMap.containsKey(eag)) {
|
if (!newSubchannelMap.containsKey(eag)) {
|
||||||
|
Subchannel subchannel = subchannels.get(eag);
|
||||||
|
if (subchannel == null) {
|
||||||
Attributes subchannelAttrs = Attributes.newBuilder()
|
Attributes subchannelAttrs = Attributes.newBuilder()
|
||||||
.set(STATE_INFO,
|
.set(STATE_INFO,
|
||||||
new AtomicReference<ConnectivityStateInfo>(
|
new AtomicReference<ConnectivityStateInfo>(
|
||||||
ConnectivityStateInfo.forNonError(IDLE)))
|
ConnectivityStateInfo.forNonError(IDLE)))
|
||||||
.build();
|
.build();
|
||||||
Subchannel subchannel = helper.createSubchannel(eag, subchannelAttrs);
|
subchannel = helper.createSubchannel(eag, subchannelAttrs);
|
||||||
subchannel.requestConnection();
|
subchannel.requestConnection();
|
||||||
|
}
|
||||||
newSubchannelMap.put(eag, subchannel);
|
newSubchannelMap.put(eag, subchannel);
|
||||||
}
|
}
|
||||||
newRoundRobinList.add(eag);
|
newRoundRobinList.add(eag);
|
||||||
|
|
|
||||||
|
|
@ -601,15 +601,20 @@ public class GrpclbLoadBalancerTest {
|
||||||
assertSame(error1, picker6.result.getStatus());
|
assertSame(error1, picker6.result.getStatus());
|
||||||
|
|
||||||
// Update backends, with a drop entry
|
// Update backends, with a drop entry
|
||||||
List<InetSocketAddress> backends2 = Arrays.asList(
|
List<InetSocketAddress> backends2 =
|
||||||
new InetSocketAddress("127.0.0.1", 2030), null);
|
Arrays.asList(
|
||||||
|
new InetSocketAddress("127.0.0.1", 2030), // New address
|
||||||
|
null, // drop
|
||||||
|
new InetSocketAddress("127.0.0.1", 2010), // Existing address
|
||||||
|
new InetSocketAddress("127.0.0.1", 2030)); // New address appearing second time
|
||||||
verify(subchannel1, never()).shutdown();
|
verify(subchannel1, never()).shutdown();
|
||||||
verify(subchannel2, never()).shutdown();
|
|
||||||
|
|
||||||
lbResponseObserver.onNext(buildLbResponse(backends2));
|
lbResponseObserver.onNext(buildLbResponse(backends2));
|
||||||
verify(subchannel1).shutdown();
|
verify(subchannel1).shutdown(); // not in backends2, closed
|
||||||
verify(subchannel2).shutdown();
|
verify(subchannel2, never()).shutdown(); // backends2[2], will be kept
|
||||||
|
|
||||||
|
inOrder.verify(helper, never()).createSubchannel(
|
||||||
|
eq(new EquivalentAddressGroup(backends2.get(2))), any(Attributes.class));
|
||||||
inOrder.verify(helper).createSubchannel(
|
inOrder.verify(helper).createSubchannel(
|
||||||
eq(new EquivalentAddressGroup(backends2.get(0))), any(Attributes.class));
|
eq(new EquivalentAddressGroup(backends2.get(0))), any(Attributes.class));
|
||||||
assertEquals(1, mockSubchannels.size());
|
assertEquals(1, mockSubchannels.size());
|
||||||
|
|
@ -620,16 +625,23 @@ public class GrpclbLoadBalancerTest {
|
||||||
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
|
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
assertRoundRobinList(picker7, (Subchannel) null);
|
assertRoundRobinList(picker7, (Subchannel) null);
|
||||||
|
|
||||||
// State updates on obsolete subchannels will have no effect
|
// State updates on obsolete subchannel1 will have no effect
|
||||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
|
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
|
||||||
|
deliverSubchannelState(
|
||||||
|
subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
|
||||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
|
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
|
||||||
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(SHUTDOWN));
|
|
||||||
inOrder.verifyNoMoreInteractions();
|
inOrder.verifyNoMoreInteractions();
|
||||||
|
|
||||||
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
|
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
|
||||||
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
|
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
|
||||||
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
|
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
assertRoundRobinList(picker8, subchannel3, null);
|
// subchannel2 is still IDLE, thus not in the active list
|
||||||
|
assertRoundRobinList(picker8, subchannel3, null, subchannel3);
|
||||||
|
// subchannel2 becomes READY and makes it into the list
|
||||||
|
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
|
||||||
|
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
|
||||||
|
RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
assertRoundRobinList(picker9, subchannel3, null, subchannel2, subchannel3);
|
||||||
|
|
||||||
verify(subchannel3, never()).shutdown();
|
verify(subchannel3, never()).shutdown();
|
||||||
assertFalse(oobChannel.isShutdown());
|
assertFalse(oobChannel.isShutdown());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue