diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java
index 576c339093..c1fff6908d 100644
--- a/api/src/main/java/io/grpc/LoadBalancer.java
+++ b/api/src/main/java/io/grpc/LoadBalancer.java
@@ -1211,7 +1211,8 @@ public abstract class LoadBalancer {
/**
* Asks the Subchannel to create a connection (aka transport), if there isn't an active one.
*
- *
It should be called from the Synchronization Context. See It should be called from the Synchronization Context. Currently will log a warning if
+ * violated. It will become an exception eventually. See #5015 for the background.
*
* @since 1.2.0
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 03aeb5e70d..9346a03dba 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -1572,6 +1572,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override
public void requestConnection() {
+ logWarningIfNotInSyncContext("Subchannel.requestConnection()");
checkState(started, "not started");
subchannel.obtainActiveTransport();
}
diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java
index 7bec4c173c..2056543824 100644
--- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java
+++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java
@@ -28,6 +28,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Status;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
@@ -137,9 +138,10 @@ final class PickFirstLoadBalancer extends LoadBalancer {
}
}
- /** Picker that requests connection during pick, and returns noResult. */
- private static final class RequestConnectionPicker extends SubchannelPicker {
+ /** Picker that requests connection during the first pick, and returns noResult. */
+ private final class RequestConnectionPicker extends SubchannelPicker {
private final Subchannel subchannel;
+ private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
RequestConnectionPicker(Subchannel subchannel) {
this.subchannel = checkNotNull(subchannel, "subchannel");
@@ -147,7 +149,14 @@ final class PickFirstLoadBalancer extends LoadBalancer {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
- subchannel.requestConnection();
+ if (connectionRequested.compareAndSet(false, true)) {
+ helper.getSynchronizationContext().execute(new Runnable() {
+ @Override
+ public void run() {
+ subchannel.requestConnection();
+ }
+ });
+ }
return PickResult.withNoResult();
}
}
diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
index 1bd823834d..4b08b91169 100644
--- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
+++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
@@ -25,8 +25,11 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -46,6 +49,7 @@ import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Status;
+import io.grpc.SynchronizationContext;
import java.net.SocketAddress;
import java.util.List;
import org.junit.After;
@@ -71,6 +75,13 @@ public class PickFirstLoadBalancerTest {
private List socketAddresses = Lists.newArrayList();
private static final Attributes.Key FOO = Attributes.Key.create("foo");
+ private final SynchronizationContext syncContext = new SynchronizationContext(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw new AssertionError(e);
+ }
+ });
private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build();
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@@ -96,6 +107,7 @@ public class PickFirstLoadBalancerTest {
}
when(mockSubchannel.getAllAddresses()).thenThrow(new UnsupportedOperationException());
+ when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(mockSubchannel);
loadBalancer = new PickFirstLoadBalancer(mockHelper);
@@ -120,12 +132,38 @@ public class PickFirstLoadBalancerTest {
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
+ // Calling pickSubchannel() twice gave the same result
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
verifyNoMoreInteractions(mockHelper);
}
+ @Test
+ public void requestConnectionPicker() throws Exception {
+ loadBalancer.handleResolvedAddresses(
+ ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
+
+ InOrder inOrder = inOrder(mockHelper, mockSubchannel);
+ inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
+ SubchannelStateListener stateListener = stateListenerCaptor.getValue();
+ inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
+ inOrder.verify(mockSubchannel).requestConnection();
+
+ stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
+ inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
+
+ SubchannelPicker picker = pickerCaptor.getValue();
+
+ // Calling pickSubchannel() twice gave the same result
+ assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs));
+
+ // But the picker calls requestConnection() only once
+ inOrder.verify(mockSubchannel).requestConnection();
+
+ verify(mockSubchannel, times(2)).requestConnection();
+ }
+
@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
@@ -186,6 +224,8 @@ public class PickFirstLoadBalancerTest {
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel();
+ reset(mockHelper);
+ when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
@@ -200,6 +240,7 @@ public class PickFirstLoadBalancerTest {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
+ verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care
verifyNoMoreInteractions(mockHelper);
}
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 3ea7174a88..a383d4643c 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -71,6 +71,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@@ -741,7 +742,7 @@ final class GrpclbState {
break;
default:
pickList = Collections.singletonList(
- new IdleSubchannelEntry(onlyEntry.subchannel));
+ new IdleSubchannelEntry(onlyEntry.subchannel, syncContext));
}
}
break;
@@ -924,15 +925,25 @@ final class GrpclbState {
@VisibleForTesting
static final class IdleSubchannelEntry implements RoundRobinEntry {
+ private final SynchronizationContext syncContext;
private final Subchannel subchannel;
+ private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
- IdleSubchannelEntry(Subchannel subchannel) {
+ IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) {
this.subchannel = checkNotNull(subchannel, "subchannel");
+ this.syncContext = checkNotNull(syncContext, "syncContext");
}
@Override
public PickResult picked(Metadata headers) {
- subchannel.requestConnection();
+ if (connectionRequested.compareAndSet(false, true)) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ subchannel.requestConnection();
+ }
+ });
+ }
return PickResult.withNoResult();
}
@@ -944,7 +955,7 @@ final class GrpclbState {
@Override
public int hashCode() {
- return Objects.hashCode(subchannel);
+ return Objects.hashCode(subchannel, syncContext);
}
@Override
@@ -953,7 +964,8 @@ final class GrpclbState {
return false;
}
IdleSubchannelEntry that = (IdleSubchannelEntry) other;
- return Objects.equal(subchannel, that.subchannel);
+ return Objects.equal(subchannel, that.subchannel)
+ && Objects.equal(syncContext, that.syncContext);
}
}
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index a6ace74431..8d64e4df8b 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -420,7 +420,7 @@ public class GrpclbLoadBalancerTest {
@Test
public void roundRobinPickerWithIdleEntry_noDrop() {
Subchannel subchannel = mock(Subchannel.class);
- IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel);
+ IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker =
new RoundRobinPicker(Collections.emptyList(), Collections.singletonList(entry));
@@ -429,6 +429,9 @@ public class GrpclbLoadBalancerTest {
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
+ assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
+ // Only the first pick triggers requestConnection()
+ verify(subchannel).requestConnection();
}
@Test
@@ -440,7 +443,7 @@ public class GrpclbLoadBalancerTest {
List dropList = Arrays.asList(null, d);
Subchannel subchannel = mock(Subchannel.class);
- IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel);
+ IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
@@ -453,7 +456,8 @@ public class GrpclbLoadBalancerTest {
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
- verify(subchannel, times(2)).requestConnection();
+ // Only the first pick triggers requestConnection()
+ verify(subchannel).requestConnection();
}
@Test
@@ -1729,7 +1733,7 @@ public class GrpclbLoadBalancerTest {
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
assertThat(picker0.dropList).containsExactly(null, null);
- assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel));
+ assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// PICK_FIRST doesn't eagerly connect
verify(subchannel, never()).requestConnection();
@@ -1854,7 +1858,7 @@ public class GrpclbLoadBalancerTest {
new BackendEntry(subchannel, new TokenAttachingTracerFactory(null)));
assertThat(picker0.dropList).containsExactly(null, null);
- assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel));
+ assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// Finally, an LB response, which brings us out of fallback