mirror of https://github.com/grpc/grpc-java.git
api: move SubchannelPicker.requestConnection() to LoadBalancer. (#5751)
We will require Subchannel.requestConnection() to be called from sync-context (#5722), but SubchannelPicker.requestConnection() is currently calling it with the assumption of thread-safety. Actually SubchannelPicker.requestConnection() is called already from sync-context by ChannelImpl, it makes more sense to move this method to LoadBalancer where all other methods are sync-context'ed, rather than making SubchannelPicker.requestConnection() sync-context'ed and fragmenting the SubchannelPicker API because pickSubchannel() is thread-safe. C++ also has the requestConnection() equivalent on their LoadBalancer interface.
This commit is contained in:
parent
f3bf250a46
commit
cec9ee368d
|
|
@ -362,6 +362,19 @@ public abstract class LoadBalancer {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The channel asks the LoadBalancer to establish connections now (if applicable) so that the
|
||||||
|
* upcoming RPC may then just pick a ready connection without waiting for connections. This
|
||||||
|
* is triggered by {@link ManagedChannel#getState ManagedChannel.getState(true)}.
|
||||||
|
*
|
||||||
|
* <p>If LoadBalancer doesn't override it, this is no-op. If it infeasible to create connections
|
||||||
|
* given the current state, e.g. no Subchannel has been created yet, LoadBalancer can ignore this
|
||||||
|
* request.
|
||||||
|
*
|
||||||
|
* @since 1.22.0
|
||||||
|
*/
|
||||||
|
public void requestConnection() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only
|
* The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only
|
||||||
* synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
|
* synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
|
||||||
|
|
@ -385,8 +398,10 @@ public abstract class LoadBalancer {
|
||||||
*
|
*
|
||||||
* <p>No-op if unsupported.
|
* <p>No-op if unsupported.
|
||||||
*
|
*
|
||||||
|
* @deprecated override {@link LoadBalancer#requestConnection} instead.
|
||||||
* @since 1.11.0
|
* @since 1.11.0
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void requestConnection() {}
|
public void requestConnection() {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -175,6 +175,11 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestConnection() {
|
||||||
|
getDelegate().requestConnection();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
delegate.shutdown();
|
delegate.shutdown();
|
||||||
|
|
|
||||||
|
|
@ -882,6 +882,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public ConnectivityState getState(boolean requestConnection) {
|
public ConnectivityState getState(boolean requestConnection) {
|
||||||
ConnectivityState savedChannelState = channelStateManager.getState();
|
ConnectivityState savedChannelState = channelStateManager.getState();
|
||||||
if (requestConnection && savedChannelState == IDLE) {
|
if (requestConnection && savedChannelState == IDLE) {
|
||||||
|
|
@ -892,6 +893,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
if (subchannelPicker != null) {
|
if (subchannelPicker != null) {
|
||||||
subchannelPicker.requestConnection();
|
subchannelPicker.requestConnection();
|
||||||
}
|
}
|
||||||
|
if (lbHelper != null) {
|
||||||
|
lbHelper.lb.requestConnection();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -110,6 +110,13 @@ final class PickFirstLoadBalancer extends LoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestConnection() {
|
||||||
|
if (subchannel != null) {
|
||||||
|
subchannel.requestConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op picker which doesn't add any custom picking logic. It just passes already known result
|
* No-op picker which doesn't add any custom picking logic. It just passes already known result
|
||||||
* received in constructor.
|
* received in constructor.
|
||||||
|
|
@ -140,10 +147,5 @@ final class PickFirstLoadBalancer extends LoadBalancer {
|
||||||
subchannel.requestConnection();
|
subchannel.requestConnection();
|
||||||
return PickResult.withNoResult();
|
return PickResult.withNoResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void requestConnection() {
|
|
||||||
subchannel.requestConnection();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -67,6 +67,11 @@ public abstract class ForwardingLoadBalancer extends LoadBalancer {
|
||||||
return delegate().canHandleEmptyAddressListFromNameResolution();
|
return delegate().canHandleEmptyAddressListFromNameResolution();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestConnection() {
|
||||||
|
delegate().requestConnection();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
|
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
|
||||||
|
|
|
||||||
|
|
@ -1920,6 +1920,7 @@ public class ManagedChannelImplTest {
|
||||||
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
|
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void getState_withRequestConnect_IdleWithLbRunning() {
|
public void getState_withRequestConnect_IdleWithLbRunning() {
|
||||||
channelBuilder.nameResolverFactory(
|
channelBuilder.nameResolverFactory(
|
||||||
|
|
@ -1932,6 +1933,7 @@ public class ManagedChannelImplTest {
|
||||||
assertEquals(IDLE, channel.getState(true));
|
assertEquals(IDLE, channel.getState(true));
|
||||||
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
|
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
|
||||||
verify(mockPicker).requestConnection();
|
verify(mockPicker).requestConnection();
|
||||||
|
verify(mockLoadBalancer).requestConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -262,14 +262,18 @@ public class PickFirstLoadBalancerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void requestConnection() {
|
public void requestConnection() {
|
||||||
|
loadBalancer.requestConnection();
|
||||||
|
verify(mockSubchannel, never()).requestConnection();
|
||||||
|
|
||||||
loadBalancer.handleResolvedAddresses(
|
loadBalancer.handleResolvedAddresses(
|
||||||
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
|
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
|
||||||
|
verify(mockSubchannel).requestConnection();
|
||||||
|
|
||||||
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(IDLE));
|
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(IDLE));
|
||||||
verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
|
verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
|
||||||
SubchannelPicker picker = pickerCaptor.getValue();
|
|
||||||
|
|
||||||
verify(mockSubchannel).requestConnection();
|
verify(mockSubchannel).requestConnection();
|
||||||
picker.requestConnection();
|
loadBalancer.requestConnection();
|
||||||
verify(mockSubchannel, times(2)).requestConnection();
|
verify(mockSubchannel, times(2)).requestConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,13 @@ class GrpclbLoadBalancer extends LoadBalancer {
|
||||||
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
|
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestConnection() {
|
||||||
|
if (grpclbState != null) {
|
||||||
|
grpclbState.requestConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static Mode retrieveModeFromLbConfig(
|
static Mode retrieveModeFromLbConfig(
|
||||||
@Nullable Map<String, ?> rawLbConfigValue, ChannelLogger channelLogger) {
|
@Nullable Map<String, ?> rawLbConfigValue, ChannelLogger channelLogger) {
|
||||||
|
|
|
||||||
|
|
@ -224,6 +224,14 @@ final class GrpclbState {
|
||||||
maybeUpdatePicker();
|
maybeUpdatePicker();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void requestConnection() {
|
||||||
|
for (RoundRobinEntry entry : currentPicker.pickList) {
|
||||||
|
if (entry instanceof IdleSubchannelEntry) {
|
||||||
|
((IdleSubchannelEntry) entry).subchannel.requestConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void maybeUseFallbackBackends() {
|
private void maybeUseFallbackBackends() {
|
||||||
if (balancerWorking) {
|
if (balancerWorking) {
|
||||||
return;
|
return;
|
||||||
|
|
@ -1025,13 +1033,5 @@ final class GrpclbState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void requestConnection() {
|
|
||||||
for (RoundRobinEntry entry : pickList) {
|
|
||||||
if (entry instanceof IdleSubchannelEntry) {
|
|
||||||
((IdleSubchannelEntry) entry).subchannel.requestConnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,6 @@ import io.grpc.grpclb.GrpclbState.DropEntry;
|
||||||
import io.grpc.grpclb.GrpclbState.ErrorEntry;
|
import io.grpc.grpclb.GrpclbState.ErrorEntry;
|
||||||
import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry;
|
import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry;
|
||||||
import io.grpc.grpclb.GrpclbState.Mode;
|
import io.grpc.grpclb.GrpclbState.Mode;
|
||||||
import io.grpc.grpclb.GrpclbState.RoundRobinEntry;
|
|
||||||
import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
|
import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
|
||||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||||
import io.grpc.inprocess.InProcessServerBuilder;
|
import io.grpc.inprocess.InProcessServerBuilder;
|
||||||
|
|
@ -455,26 +454,6 @@ public class GrpclbLoadBalancerTest {
|
||||||
verify(subchannel, times(2)).requestConnection();
|
verify(subchannel, times(2)).requestConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void roundRobinPicker_requestConnection() {
|
|
||||||
// requestConnection() on RoundRobinPicker is only passed to IdleSubchannelEntry
|
|
||||||
|
|
||||||
Subchannel subchannel1 = mock(Subchannel.class);
|
|
||||||
Subchannel subchannel2 = mock(Subchannel.class);
|
|
||||||
|
|
||||||
RoundRobinPicker picker = new RoundRobinPicker(
|
|
||||||
Collections.<DropEntry>emptyList(),
|
|
||||||
Arrays.<RoundRobinEntry>asList(
|
|
||||||
new BackendEntry(subchannel1), new IdleSubchannelEntry(subchannel2),
|
|
||||||
new ErrorEntry(Status.UNAVAILABLE)));
|
|
||||||
|
|
||||||
verify(subchannel2, never()).requestConnection();
|
|
||||||
|
|
||||||
picker.requestConnection();
|
|
||||||
verify(subchannel2).requestConnection();
|
|
||||||
verify(subchannel1, never()).requestConnection();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void loadReporting() {
|
public void loadReporting() {
|
||||||
Metadata headers = new Metadata();
|
Metadata headers = new Metadata();
|
||||||
|
|
@ -1815,7 +1794,7 @@ public class GrpclbLoadBalancerTest {
|
||||||
verify(subchannel).requestConnection();
|
verify(subchannel).requestConnection();
|
||||||
|
|
||||||
// ... or requested by application
|
// ... or requested by application
|
||||||
picker5.requestConnection();
|
balancer.requestConnection();
|
||||||
verify(subchannel, times(2)).requestConnection();
|
verify(subchannel, times(2)).requestConnection();
|
||||||
|
|
||||||
// PICK_FIRST doesn't use subchannelPool
|
// PICK_FIRST doesn't use subchannelPool
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue