api: add Subchannel.getInternalSubchannel(). (#5773)

Previously PickResult's Subchannel must be the actual implementation
returned from the Channel's Helper, and Channel would cast it to the
implementation class in order to use it.  This will be broken if
Subchannel is wrapped in the case of hierarchical LoadBalancers.

getInternalSubchannel() is the guaranteed path for the Channel to get
the InternalSubchannel implementation.  It is friendly for wrapping.

Background: #5676
This commit is contained in:
Kun Zhang 2019-05-22 13:52:28 -07:00 committed by GitHub
parent 36ae0ed165
commit f8fffeff12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 116 additions and 51 deletions

View File

@ -490,8 +490,11 @@ public abstract class LoadBalancer {
/**
* A decision to proceed the RPC on a Subchannel.
*
* <p>Only Subchannels returned by {@link Helper#createSubchannel Helper.createSubchannel()}
* will work. DO NOT try to use your own implementations of Subchannels, as they won't work.
* <p>The Subchannel should either be an original Subchannel returned by {@link
* Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferrably based on
* {@code ForwardingSubchannel}. At the very least its {@link Subchannel#getInternalSubchannel
* getInternalSubchannel()} must return the same object as the one returned by the original.
* Otherwise the Channel cannot use it for the RPC.
*
* <p>When the RPC tries to use the return Subchannel, which is briefly after this method
* returns, the state of the Subchannel will decide where the RPC would go:
@ -1297,6 +1300,21 @@ public abstract class LoadBalancer {
public ChannelLogger getChannelLogger() {
throw new UnsupportedOperationException();
}
/**
* (Internal use only) returns an object that represents the underlying subchannel that is used
* by the Channel for sending RPCs when this {@link Subchannel} is picked. This is an opaque
* object that is both provided and consumed by the Channel. Its type <strong>is not</strong>
* {@code Subchannel}.
*
* <p>Warning: this is INTERNAL API, is not supposed to be used by external users, and may
* change without notice. If you think you must use it, please file an issue and we can consider
* removing its "internal" status.
*/
@Internal
public Object getInternalSubchannel() {
throw new UnsupportedOperationException();
}
}
/**

View File

@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.grpc.InternalChannelz.ChannelStats;
import io.grpc.InternalInstrumented;
import io.grpc.LoadBalancer;
import javax.annotation.Nullable;
/**
* The base interface of the Subchannels returned by {@link
@ -28,16 +27,10 @@ import javax.annotation.Nullable;
*/
abstract class AbstractSubchannel extends LoadBalancer.Subchannel {
/**
* Same as {@link InternalSubchannel#obtainActiveTransport}.
*/
@Nullable
abstract ClientTransport obtainActiveTransport();
/**
* Returns the InternalSubchannel as an {@code Instrumented<T>} for the sole purpose of channelz
* unit tests.
*/
@VisibleForTesting
abstract InternalInstrumented<ChannelStats> getInternalSubchannel();
abstract InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel();
}

View File

@ -689,7 +689,7 @@ public final class GrpcUtil {
final ClientTransport transport;
Subchannel subchannel = result.getSubchannel();
if (subchannel != null) {
transport = ((AbstractSubchannel) subchannel).obtainActiveTransport();
transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport();
} else {
transport = null;
}

View File

@ -65,7 +65,7 @@ import javax.annotation.concurrent.ThreadSafe;
* Transports for a single {@link SocketAddress}.
*/
@ThreadSafe
final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName());
private final InternalLogId logId;
@ -192,13 +192,8 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
return channelLogger;
}
/**
* Returns a READY transport that will be used to create new streams.
*
* <p>Returns {@code null} if the state is not READY. Will try to connect if state is IDLE.
*/
@Nullable
ClientTransport obtainActiveTransport() {
@Override
public ClientTransport obtainActiveTransport() {
ClientTransport savedTransport = activeTransport;
if (savedTransport != null) {
return savedTransport;

View File

@ -1138,7 +1138,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
checkArgument(subchannel instanceof SubchannelImpl,
"subchannel must have been returned from createSubchannel");
logWarningIfNotInSyncContext("updateSubchannelAddresses()");
((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs);
((InternalSubchannel) subchannel.getInternalSubchannel()).updateAddresses(addrs);
}
@Override
@ -1499,13 +1499,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
@Override
ClientTransport obtainActiveTransport() {
checkState(started, "Subchannel is not started");
return subchannel.obtainActiveTransport();
}
@Override
InternalInstrumented<ChannelStats> getInternalSubchannel() {
InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
checkState(started, "not started");
return subchannel;
}
@ -1602,6 +1596,12 @@ final class ManagedChannelImpl extends ManagedChannel implements
callTracerFactory.create());
}
@Override
public Object getInternalSubchannel() {
checkState(started, "Subchannel is not started");
return subchannel;
}
@Override
public ChannelLogger getChannelLogger() {
return subchannel.getChannelLogger();

View File

@ -148,12 +148,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
}
@Override
ClientTransport obtainActiveTransport() {
return subchannel.obtainActiveTransport();
}
@Override
InternalInstrumented<ChannelStats> getInternalSubchannel() {
InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
return subchannel;
}
@ -171,6 +166,11 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
public Attributes getAttributes() {
return Attributes.EMPTY;
}
@Override
public Object getInternalSubchannel() {
return subchannel;
}
};
subchannelPicker = new SubchannelPicker() {

View File

@ -0,0 +1,34 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
* Provides transports for sending RPCs.
*/
@ThreadSafe
interface TransportProvider {
/**
* Returns a READY transport that will be used to create new streams.
*
* <p>Returns {@code null} if the state is not READY. Will try to connect if state is IDLE.
*/
@Nullable
ClientTransport obtainActiveTransport();
}

View File

@ -69,6 +69,11 @@ public abstract class ForwardingSubchannel extends LoadBalancer.Subchannel {
return delegate().getChannelLogger();
}
@Override
public Object getInternalSubchannel() {
return delegate().getInternalSubchannel();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();

View File

@ -75,6 +75,7 @@ public class DelayedClientTransportTest {
@Mock private ManagedClientTransport.Listener transportListener;
@Mock private SubchannelPicker mockPicker;
@Mock private AbstractSubchannel mockSubchannel;
@Mock private TransportProvider mockInternalSubchannel;
@Mock private ClientTransport mockRealTransport;
@Mock private ClientTransport mockRealTransport2;
@Mock private ClientStream mockRealStream;
@ -119,7 +120,8 @@ public class DelayedClientTransportTest {
@Before public void setUp() {
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(mockSubchannel));
when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(mockSubchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
when(mockInternalSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(mockRealTransport.newStream(same(method), same(headers), same(callOptions)))
.thenReturn(mockRealStream);
when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2)))
@ -308,9 +310,9 @@ public class DelayedClientTransportTest {
any(CallOptions.class))).thenReturn(mockRealStream);
when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream2);
when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport);
when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2);
when(subchannel3.obtainActiveTransport()).thenReturn(null);
when(subchannel1.getInternalSubchannel()).thenReturn(newTransportProvider(mockRealTransport));
when(subchannel2.getInternalSubchannel()).thenReturn(newTransportProvider(mockRealTransport2));
when(subchannel3.getInternalSubchannel()).thenReturn(newTransportProvider(null));
// Fail-fast streams
DelayedStream ff1 = (DelayedStream) delayedTransport.newStream(
@ -465,7 +467,7 @@ public class DelayedClientTransportTest {
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
@ -477,7 +479,7 @@ public class DelayedClientTransportTest {
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(subchannel).obtainActiveTransport();
verify(mockInternalSubchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@ -599,4 +601,13 @@ public class DelayedClientTransportTest {
assertTrue(delayedTransport.hasPendingStreams());
verify(transportListener).transportInUse(true);
}
private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
public ClientTransport obtainActiveTransport() {
return transport;
}
};
}
}

View File

@ -475,9 +475,12 @@ public class ManagedChannelImplTest {
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
// subchannels are not root channels
assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId()));
assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
assertThat(getStats(channel).subchannels).containsExactly(subchannel.getInternalSubchannel());
assertNull(
channelz.getRootChannel(subchannel.getInstrumentedInternalSubchannel().getLogId().getId()));
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
assertThat(getStats(channel).subchannels)
.containsExactly(subchannel.getInstrumentedInternalSubchannel());
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
@ -489,11 +492,13 @@ public class ManagedChannelImplTest {
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
// terminate subchannel
assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
shutdownSafely(helper, subchannel);
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
timer.runDueTasks();
assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
assertFalse(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
assertThat(getStats(channel).subchannels).isEmpty();
// channel still appears
@ -511,9 +516,12 @@ public class ManagedChannelImplTest {
assertTrue(channelz.containsSubchannel(oob.getLogId()));
AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel();
assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
assertThat(getStats(oob).subchannels).containsExactly(subchannel.getInternalSubchannel());
assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
assertThat(getStats(oob).subchannels)
.containsExactly(subchannel.getInstrumentedInternalSubchannel());
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
oob.getSubchannel().requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
@ -528,7 +536,8 @@ public class ManagedChannelImplTest {
oob.shutdown();
assertFalse(channelz.containsSubchannel(oob.getLogId()));
assertThat(getStats(channel).subchannels).isEmpty();
assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
assertFalse(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
// channel still appears
assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
@ -2571,7 +2580,7 @@ public class ManagedChannelImplTest {
.setDescription("Child Subchannel started")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.setSubchannelRef(subchannel.getInternalSubchannel())
.setSubchannelRef(subchannel.getInstrumentedInternalSubchannel())
.build());
assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Subchannel for [[[test-addr]/{}]] created")
@ -2742,7 +2751,7 @@ public class ManagedChannelImplTest {
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
timer.forwardNanos(1234);
subchannel.obtainActiveTransport();
((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport();
assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("CONNECTING as requested")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
@ -3958,7 +3967,7 @@ public class ManagedChannelImplTest {
}
private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception {
return subchannel.getInternalSubchannel().getStats().get();
return subchannel.getInstrumentedInternalSubchannel().getStats().get();
}
private static ChannelStats getStats(