From a2595d9e7dcce2e9a267b20a616b41c3615b89db Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 20 May 2019 11:39:02 -0700 Subject: [PATCH] api/core/grpclb: add warning about calling Subchannel.requestConnection() outside of sync-context (#5757) The pick_first policies in core and grpclb previously would call Subchannel.requestConnection() from data-path. They now will schedule that call in the sync-context to avoid the warning. They will only call it for the first pick of each picker, to prevent storming the sync-context. --- api/src/main/java/io/grpc/LoadBalancer.java | 3 +- .../io/grpc/internal/ManagedChannelImpl.java | 1 + .../grpc/internal/PickFirstLoadBalancer.java | 15 +++++-- .../internal/PickFirstLoadBalancerTest.java | 41 +++++++++++++++++++ .../main/java/io/grpc/grpclb/GrpclbState.java | 22 +++++++--- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 14 ++++--- 6 files changed, 82 insertions(+), 14 deletions(-) diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 576c339093..c1fff6908d 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -1211,7 +1211,8 @@ public abstract class LoadBalancer { /** * Asks the Subchannel to create a connection (aka transport), if there isn't an active one. * - *

It should be called from the Synchronization Context. See It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. * * @since 1.2.0 diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 03aeb5e70d..9346a03dba 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1572,6 +1572,7 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public void requestConnection() { + logWarningIfNotInSyncContext("Subchannel.requestConnection()"); checkState(started, "not started"); subchannel.obtainActiveTransport(); } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index 7bec4c173c..2056543824 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -28,6 +28,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.Status; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link @@ -137,9 +138,10 @@ final class PickFirstLoadBalancer extends LoadBalancer { } } - /** Picker that requests connection during pick, and returns noResult. */ - private static final class RequestConnectionPicker extends SubchannelPicker { + /** Picker that requests connection during the first pick, and returns noResult. */ + private final class RequestConnectionPicker extends SubchannelPicker { private final Subchannel subchannel; + private final AtomicBoolean connectionRequested = new AtomicBoolean(false); RequestConnectionPicker(Subchannel subchannel) { this.subchannel = checkNotNull(subchannel, "subchannel"); @@ -147,7 +149,14 @@ final class PickFirstLoadBalancer extends LoadBalancer { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - subchannel.requestConnection(); + if (connectionRequested.compareAndSet(false, true)) { + helper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + subchannel.requestConnection(); + } + }); + } return PickResult.withNoResult(); } } diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1bd823834d..4b08b91169 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -25,8 +25,11 @@ import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -46,6 +49,7 @@ import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.Status; +import io.grpc.SynchronizationContext; import java.net.SocketAddress; import java.util.List; import org.junit.After; @@ -71,6 +75,13 @@ public class PickFirstLoadBalancerTest { private List socketAddresses = Lists.newArrayList(); private static final Attributes.Key FOO = Attributes.Key.create("foo"); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build(); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @@ -96,6 +107,7 @@ public class PickFirstLoadBalancerTest { } when(mockSubchannel.getAllAddresses()).thenThrow(new UnsupportedOperationException()); + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(mockSubchannel); loadBalancer = new PickFirstLoadBalancer(mockHelper); @@ -120,12 +132,38 @@ public class PickFirstLoadBalancerTest { verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); + // Calling pickSubchannel() twice gave the same result assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } + @Test + public void requestConnectionPicker() throws Exception { + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + + InOrder inOrder = inOrder(mockHelper, mockSubchannel); + inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + inOrder.verify(mockSubchannel).requestConnection(); + + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + SubchannelPicker picker = pickerCaptor.getValue(); + + // Calling pickSubchannel() twice gave the same result + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + + // But the picker calls requestConnection() only once + inOrder.verify(mockSubchannel).requestConnection(); + + verify(mockSubchannel, times(2)).requestConnection(); + } + @Test public void pickAfterResolvedAndUnchanged() throws Exception { loadBalancer.handleResolvedAddresses( @@ -186,6 +224,8 @@ public class PickFirstLoadBalancerTest { SubchannelStateListener stateListener = stateListenerCaptor.getValue(); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel(); + reset(mockHelper); + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); Status error = Status.UNAVAILABLE.withDescription("boom!"); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); @@ -200,6 +240,7 @@ public class PickFirstLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care verifyNoMoreInteractions(mockHelper); } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 3ea7174a88..a383d4643c 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -71,6 +71,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -741,7 +742,7 @@ final class GrpclbState { break; default: pickList = Collections.singletonList( - new IdleSubchannelEntry(onlyEntry.subchannel)); + new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); } } break; @@ -924,15 +925,25 @@ final class GrpclbState { @VisibleForTesting static final class IdleSubchannelEntry implements RoundRobinEntry { + private final SynchronizationContext syncContext; private final Subchannel subchannel; + private final AtomicBoolean connectionRequested = new AtomicBoolean(false); - IdleSubchannelEntry(Subchannel subchannel) { + IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) { this.subchannel = checkNotNull(subchannel, "subchannel"); + this.syncContext = checkNotNull(syncContext, "syncContext"); } @Override public PickResult picked(Metadata headers) { - subchannel.requestConnection(); + if (connectionRequested.compareAndSet(false, true)) { + syncContext.execute(new Runnable() { + @Override + public void run() { + subchannel.requestConnection(); + } + }); + } return PickResult.withNoResult(); } @@ -944,7 +955,7 @@ final class GrpclbState { @Override public int hashCode() { - return Objects.hashCode(subchannel); + return Objects.hashCode(subchannel, syncContext); } @Override @@ -953,7 +964,8 @@ final class GrpclbState { return false; } IdleSubchannelEntry that = (IdleSubchannelEntry) other; - return Objects.equal(subchannel, that.subchannel); + return Objects.equal(subchannel, that.subchannel) + && Objects.equal(syncContext, that.syncContext); } } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index a6ace74431..8d64e4df8b 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -420,7 +420,7 @@ public class GrpclbLoadBalancerTest { @Test public void roundRobinPickerWithIdleEntry_noDrop() { Subchannel subchannel = mock(Subchannel.class); - IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel); + IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext); RoundRobinPicker picker = new RoundRobinPicker(Collections.emptyList(), Collections.singletonList(entry)); @@ -429,6 +429,9 @@ public class GrpclbLoadBalancerTest { verify(subchannel, never()).requestConnection(); assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); verify(subchannel).requestConnection(); + assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); + // Only the first pick triggers requestConnection() + verify(subchannel).requestConnection(); } @Test @@ -440,7 +443,7 @@ public class GrpclbLoadBalancerTest { List dropList = Arrays.asList(null, d); Subchannel subchannel = mock(Subchannel.class); - IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel); + IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext); RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry)); PickSubchannelArgs args = mock(PickSubchannelArgs.class); @@ -453,7 +456,8 @@ public class GrpclbLoadBalancerTest { verify(subchannel).requestConnection(); assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); - verify(subchannel, times(2)).requestConnection(); + // Only the first pick triggers requestConnection() + verify(subchannel).requestConnection(); } @Test @@ -1729,7 +1733,7 @@ public class GrpclbLoadBalancerTest { assertThat(mockSubchannels).hasSize(1); Subchannel subchannel = mockSubchannels.poll(); assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel)); + assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); // PICK_FIRST doesn't eagerly connect verify(subchannel, never()).requestConnection(); @@ -1854,7 +1858,7 @@ public class GrpclbLoadBalancerTest { new BackendEntry(subchannel, new TokenAttachingTracerFactory(null))); assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel)); + assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); // Finally, an LB response, which brings us out of fallback