api/core/grpclb: add warning about calling Subchannel.requestConnection() outside of sync-context (#5757)

The pick_first policies in core and grpclb previously would call
Subchannel.requestConnection() from data-path.  They now will schedule
that call in the sync-context to avoid the warning.  They will only
call it for the first pick of each picker, to prevent storming the
sync-context.
This commit is contained in:
Kun Zhang 2019-05-20 11:39:02 -07:00 committed by GitHub
parent 7934594dfe
commit a2595d9e7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 14 deletions

View File

@ -1211,7 +1211,8 @@ public abstract class LoadBalancer {
/** /**
* Asks the Subchannel to create a connection (aka transport), if there isn't an active one. * Asks the Subchannel to create a connection (aka transport), if there isn't an active one.
* *
* <p>It should be called from the Synchronization Context. See <a * <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background. * href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
* *
* @since 1.2.0 * @since 1.2.0

View File

@ -1572,6 +1572,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override @Override
public void requestConnection() { public void requestConnection() {
logWarningIfNotInSyncContext("Subchannel.requestConnection()");
checkState(started, "not started"); checkState(started, "not started");
subchannel.obtainActiveTransport(); subchannel.obtainActiveTransport();
} }

View File

@ -28,6 +28,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Status; import io.grpc.Status;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
@ -137,9 +138,10 @@ final class PickFirstLoadBalancer extends LoadBalancer {
} }
} }
/** Picker that requests connection during pick, and returns noResult. */ /** Picker that requests connection during the first pick, and returns noResult. */
private static final class RequestConnectionPicker extends SubchannelPicker { private final class RequestConnectionPicker extends SubchannelPicker {
private final Subchannel subchannel; private final Subchannel subchannel;
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
RequestConnectionPicker(Subchannel subchannel) { RequestConnectionPicker(Subchannel subchannel) {
this.subchannel = checkNotNull(subchannel, "subchannel"); this.subchannel = checkNotNull(subchannel, "subchannel");
@ -147,7 +149,14 @@ final class PickFirstLoadBalancer extends LoadBalancer {
@Override @Override
public PickResult pickSubchannel(PickSubchannelArgs args) { public PickResult pickSubchannel(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
subchannel.requestConnection(); subchannel.requestConnection();
}
});
}
return PickResult.withNoResult(); return PickResult.withNoResult();
} }
} }

View File

@ -25,8 +25,11 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -46,6 +49,7 @@ import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.List; import java.util.List;
import org.junit.After; import org.junit.After;
@ -71,6 +75,13 @@ public class PickFirstLoadBalancerTest {
private List<SocketAddress> socketAddresses = Lists.newArrayList(); private List<SocketAddress> socketAddresses = Lists.newArrayList();
private static final Attributes.Key<String> FOO = Attributes.Key.create("foo"); private static final Attributes.Key<String> FOO = Attributes.Key.create("foo");
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build(); private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build();
@Rule @Rule
public final MockitoRule mocks = MockitoJUnit.rule(); public final MockitoRule mocks = MockitoJUnit.rule();
@ -96,6 +107,7 @@ public class PickFirstLoadBalancerTest {
} }
when(mockSubchannel.getAllAddresses()).thenThrow(new UnsupportedOperationException()); when(mockSubchannel.getAllAddresses()).thenThrow(new UnsupportedOperationException());
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(mockSubchannel); when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(mockSubchannel);
loadBalancer = new PickFirstLoadBalancer(mockHelper); loadBalancer = new PickFirstLoadBalancer(mockHelper);
@ -120,12 +132,38 @@ public class PickFirstLoadBalancerTest {
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection(); verify(mockSubchannel).requestConnection();
// Calling pickSubchannel() twice gave the same result
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs)); pickerCaptor.getValue().pickSubchannel(mockArgs));
verifyNoMoreInteractions(mockHelper); verifyNoMoreInteractions(mockHelper);
} }
@Test
public void requestConnectionPicker() throws Exception {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
InOrder inOrder = inOrder(mockHelper, mockSubchannel);
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
inOrder.verify(mockSubchannel).requestConnection();
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
// Calling pickSubchannel() twice gave the same result
assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs));
// But the picker calls requestConnection() only once
inOrder.verify(mockSubchannel).requestConnection();
verify(mockSubchannel, times(2)).requestConnection();
}
@Test @Test
public void pickAfterResolvedAndUnchanged() throws Exception { public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses( loadBalancer.handleResolvedAddresses(
@ -186,6 +224,8 @@ public class PickFirstLoadBalancerTest {
SubchannelStateListener stateListener = stateListenerCaptor.getValue(); SubchannelStateListener stateListener = stateListenerCaptor.getValue();
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel(); Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel();
reset(mockHelper);
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
Status error = Status.UNAVAILABLE.withDescription("boom!"); Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
@ -200,6 +240,7 @@ public class PickFirstLoadBalancerTest {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care
verifyNoMoreInteractions(mockHelper); verifyNoMoreInteractions(mockHelper);
} }

View File

@ -71,6 +71,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
@ -741,7 +742,7 @@ final class GrpclbState {
break; break;
default: default:
pickList = Collections.<RoundRobinEntry>singletonList( pickList = Collections.<RoundRobinEntry>singletonList(
new IdleSubchannelEntry(onlyEntry.subchannel)); new IdleSubchannelEntry(onlyEntry.subchannel, syncContext));
} }
} }
break; break;
@ -924,15 +925,25 @@ final class GrpclbState {
@VisibleForTesting @VisibleForTesting
static final class IdleSubchannelEntry implements RoundRobinEntry { static final class IdleSubchannelEntry implements RoundRobinEntry {
private final SynchronizationContext syncContext;
private final Subchannel subchannel; private final Subchannel subchannel;
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
IdleSubchannelEntry(Subchannel subchannel) { IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) {
this.subchannel = checkNotNull(subchannel, "subchannel"); this.subchannel = checkNotNull(subchannel, "subchannel");
this.syncContext = checkNotNull(syncContext, "syncContext");
} }
@Override @Override
public PickResult picked(Metadata headers) { public PickResult picked(Metadata headers) {
if (connectionRequested.compareAndSet(false, true)) {
syncContext.execute(new Runnable() {
@Override
public void run() {
subchannel.requestConnection(); subchannel.requestConnection();
}
});
}
return PickResult.withNoResult(); return PickResult.withNoResult();
} }
@ -944,7 +955,7 @@ final class GrpclbState {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hashCode(subchannel); return Objects.hashCode(subchannel, syncContext);
} }
@Override @Override
@ -953,7 +964,8 @@ final class GrpclbState {
return false; return false;
} }
IdleSubchannelEntry that = (IdleSubchannelEntry) other; IdleSubchannelEntry that = (IdleSubchannelEntry) other;
return Objects.equal(subchannel, that.subchannel); return Objects.equal(subchannel, that.subchannel)
&& Objects.equal(syncContext, that.syncContext);
} }
} }

View File

@ -420,7 +420,7 @@ public class GrpclbLoadBalancerTest {
@Test @Test
public void roundRobinPickerWithIdleEntry_noDrop() { public void roundRobinPickerWithIdleEntry_noDrop() {
Subchannel subchannel = mock(Subchannel.class); Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel); IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker = RoundRobinPicker picker =
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry)); new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry));
@ -429,6 +429,9 @@ public class GrpclbLoadBalancerTest {
verify(subchannel, never()).requestConnection(); verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection(); verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
} }
@Test @Test
@ -440,7 +443,7 @@ public class GrpclbLoadBalancerTest {
List<DropEntry> dropList = Arrays.asList(null, d); List<DropEntry> dropList = Arrays.asList(null, d);
Subchannel subchannel = mock(Subchannel.class); Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel); IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry)); RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class); PickSubchannelArgs args = mock(PickSubchannelArgs.class);
@ -453,7 +456,8 @@ public class GrpclbLoadBalancerTest {
verify(subchannel).requestConnection(); verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel, times(2)).requestConnection(); // Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
} }
@Test @Test
@ -1729,7 +1733,7 @@ public class GrpclbLoadBalancerTest {
assertThat(mockSubchannels).hasSize(1); assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll(); Subchannel subchannel = mockSubchannels.poll();
assertThat(picker0.dropList).containsExactly(null, null); assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel)); assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// PICK_FIRST doesn't eagerly connect // PICK_FIRST doesn't eagerly connect
verify(subchannel, never()).requestConnection(); verify(subchannel, never()).requestConnection();
@ -1854,7 +1858,7 @@ public class GrpclbLoadBalancerTest {
new BackendEntry(subchannel, new TokenAttachingTracerFactory(null))); new BackendEntry(subchannel, new TokenAttachingTracerFactory(null)));
assertThat(picker0.dropList).containsExactly(null, null); assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel)); assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// Finally, an LB response, which brings us out of fallback // Finally, an LB response, which brings us out of fallback