diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index c1fff6908d..4e13d24099 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -490,8 +490,11 @@ public abstract class LoadBalancer { /** * A decision to proceed the RPC on a Subchannel. * - *

Only Subchannels returned by {@link Helper#createSubchannel Helper.createSubchannel()} - * will work. DO NOT try to use your own implementations of Subchannels, as they won't work. + *

The Subchannel should either be an original Subchannel returned by {@link + * Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferrably based on + * {@code ForwardingSubchannel}. At the very least its {@link Subchannel#getInternalSubchannel + * getInternalSubchannel()} must return the same object as the one returned by the original. + * Otherwise the Channel cannot use it for the RPC. * *

When the RPC tries to use the return Subchannel, which is briefly after this method * returns, the state of the Subchannel will decide where the RPC would go: @@ -1297,6 +1300,21 @@ public abstract class LoadBalancer { public ChannelLogger getChannelLogger() { throw new UnsupportedOperationException(); } + + /** + * (Internal use only) returns an object that represents the underlying subchannel that is used + * by the Channel for sending RPCs when this {@link Subchannel} is picked. This is an opaque + * object that is both provided and consumed by the Channel. Its type is not + * {@code Subchannel}. + * + *

Warning: this is INTERNAL API, is not supposed to be used by external users, and may + * change without notice. If you think you must use it, please file an issue and we can consider + * removing its "internal" status. + */ + @Internal + public Object getInternalSubchannel() { + throw new UnsupportedOperationException(); + } } /** diff --git a/core/src/main/java/io/grpc/internal/AbstractSubchannel.java b/core/src/main/java/io/grpc/internal/AbstractSubchannel.java index 2d36e7a2d8..cc4b3d9fb2 100644 --- a/core/src/main/java/io/grpc/internal/AbstractSubchannel.java +++ b/core/src/main/java/io/grpc/internal/AbstractSubchannel.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import io.grpc.InternalChannelz.ChannelStats; import io.grpc.InternalInstrumented; import io.grpc.LoadBalancer; -import javax.annotation.Nullable; /** * The base interface of the Subchannels returned by {@link @@ -28,16 +27,10 @@ import javax.annotation.Nullable; */ abstract class AbstractSubchannel extends LoadBalancer.Subchannel { - /** - * Same as {@link InternalSubchannel#obtainActiveTransport}. - */ - @Nullable - abstract ClientTransport obtainActiveTransport(); - /** * Returns the InternalSubchannel as an {@code Instrumented} for the sole purpose of channelz * unit tests. */ @VisibleForTesting - abstract InternalInstrumented getInternalSubchannel(); + abstract InternalInstrumented getInstrumentedInternalSubchannel(); } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 36a6245dda..47cba3af5c 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -689,7 +689,7 @@ public final class GrpcUtil { final ClientTransport transport; Subchannel subchannel = result.getSubchannel(); if (subchannel != null) { - transport = ((AbstractSubchannel) subchannel).obtainActiveTransport(); + transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport(); } else { transport = null; } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 2d60d15791..9c85aee70d 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -65,7 +65,7 @@ import javax.annotation.concurrent.ThreadSafe; * Transports for a single {@link SocketAddress}. */ @ThreadSafe -final class InternalSubchannel implements InternalInstrumented { +final class InternalSubchannel implements InternalInstrumented, TransportProvider { private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); private final InternalLogId logId; @@ -192,13 +192,8 @@ final class InternalSubchannel implements InternalInstrumented { return channelLogger; } - /** - * Returns a READY transport that will be used to create new streams. - * - *

Returns {@code null} if the state is not READY. Will try to connect if state is IDLE. - */ - @Nullable - ClientTransport obtainActiveTransport() { + @Override + public ClientTransport obtainActiveTransport() { ClientTransport savedTransport = activeTransport; if (savedTransport != null) { return savedTransport; diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9346a03dba..b75b9e305d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1138,7 +1138,7 @@ final class ManagedChannelImpl extends ManagedChannel implements checkArgument(subchannel instanceof SubchannelImpl, "subchannel must have been returned from createSubchannel"); logWarningIfNotInSyncContext("updateSubchannelAddresses()"); - ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); + ((InternalSubchannel) subchannel.getInternalSubchannel()).updateAddresses(addrs); } @Override @@ -1499,13 +1499,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } @Override - ClientTransport obtainActiveTransport() { - checkState(started, "Subchannel is not started"); - return subchannel.obtainActiveTransport(); - } - - @Override - InternalInstrumented getInternalSubchannel() { + InternalInstrumented getInstrumentedInternalSubchannel() { checkState(started, "not started"); return subchannel; } @@ -1602,6 +1596,12 @@ final class ManagedChannelImpl extends ManagedChannel implements callTracerFactory.create()); } + @Override + public Object getInternalSubchannel() { + checkState(started, "Subchannel is not started"); + return subchannel; + } + @Override public ChannelLogger getChannelLogger() { return subchannel.getChannelLogger(); diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index d1e3236126..512abaeed1 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -148,12 +148,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented getInternalSubchannel() { + InternalInstrumented getInstrumentedInternalSubchannel() { return subchannel; } @@ -171,6 +166,11 @@ final class OobChannel extends ManagedChannel implements InternalInstrumentedReturns {@code null} if the state is not READY. Will try to connect if state is IDLE. + */ + @Nullable + ClientTransport obtainActiveTransport(); +} diff --git a/core/src/main/java/io/grpc/util/ForwardingSubchannel.java b/core/src/main/java/io/grpc/util/ForwardingSubchannel.java index 5204032519..de9a511232 100644 --- a/core/src/main/java/io/grpc/util/ForwardingSubchannel.java +++ b/core/src/main/java/io/grpc/util/ForwardingSubchannel.java @@ -69,6 +69,11 @@ public abstract class ForwardingSubchannel extends LoadBalancer.Subchannel { return delegate().getChannelLogger(); } + @Override + public Object getInternalSubchannel() { + return delegate().getInternalSubchannel(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 57a38ebb82..0f51d3f62d 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -75,6 +75,7 @@ public class DelayedClientTransportTest { @Mock private ManagedClientTransport.Listener transportListener; @Mock private SubchannelPicker mockPicker; @Mock private AbstractSubchannel mockSubchannel; + @Mock private TransportProvider mockInternalSubchannel; @Mock private ClientTransport mockRealTransport; @Mock private ClientTransport mockRealTransport2; @Mock private ClientStream mockRealStream; @@ -119,7 +120,8 @@ public class DelayedClientTransportTest { @Before public void setUp() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)); - when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport); + when(mockSubchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel); + when(mockInternalSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport); when(mockRealTransport.newStream(same(method), same(headers), same(callOptions))) .thenReturn(mockRealStream); when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2))) @@ -308,9 +310,9 @@ public class DelayedClientTransportTest { any(CallOptions.class))).thenReturn(mockRealStream); when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))).thenReturn(mockRealStream2); - when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport); - when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2); - when(subchannel3.obtainActiveTransport()).thenReturn(null); + when(subchannel1.getInternalSubchannel()).thenReturn(newTransportProvider(mockRealTransport)); + when(subchannel2.getInternalSubchannel()).thenReturn(newTransportProvider(mockRealTransport2)); + when(subchannel3.getInternalSubchannel()).thenReturn(newTransportProvider(null)); // Fail-fast streams DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( @@ -465,7 +467,7 @@ public class DelayedClientTransportTest { public void reprocess_NoPendingStream() { SubchannelPicker picker = mock(SubchannelPicker.class); AbstractSubchannel subchannel = mock(AbstractSubchannel.class); - when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport); + when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel)); when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), @@ -477,7 +479,7 @@ public class DelayedClientTransportTest { // Though picker was not originally used, it will be saved and serve future streams. ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT); verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)); - verify(subchannel).obtainActiveTransport(); + verify(mockInternalSubchannel).obtainActiveTransport(); assertSame(mockRealStream, stream); } @@ -599,4 +601,13 @@ public class DelayedClientTransportTest { assertTrue(delayedTransport.hasPendingStreams()); verify(transportListener).transportInUse(true); } + + private static TransportProvider newTransportProvider(final ClientTransport transport) { + return new TransportProvider() { + @Override + public ClientTransport obtainActiveTransport() { + return transport; + } + }; + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index df61272afa..2af7397207 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -475,9 +475,12 @@ public class ManagedChannelImplTest { (AbstractSubchannel) createSubchannelSafely( helper, addressGroup, Attributes.EMPTY, subchannelStateListener); // subchannels are not root channels - assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId())); - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); - assertThat(getStats(channel).subchannels).containsExactly(subchannel.getInternalSubchannel()); + assertNull( + channelz.getRootChannel(subchannel.getInstrumentedInternalSubchannel().getLogId().getId())); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); + assertThat(getStats(channel).subchannels) + .containsExactly(subchannel.getInstrumentedInternalSubchannel()); requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); @@ -489,11 +492,13 @@ public class ManagedChannelImplTest { assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); // terminate subchannel - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); shutdownSafely(helper, subchannel); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); timer.runDueTasks(); - assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertFalse( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); assertThat(getStats(channel).subchannels).isEmpty(); // channel still appears @@ -511,9 +516,12 @@ public class ManagedChannelImplTest { assertTrue(channelz.containsSubchannel(oob.getLogId())); AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel(); - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); - assertThat(getStats(oob).subchannels).containsExactly(subchannel.getInternalSubchannel()); - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); + assertThat(getStats(oob).subchannels) + .containsExactly(subchannel.getInstrumentedInternalSubchannel()); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); oob.getSubchannel().requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); @@ -528,7 +536,8 @@ public class ManagedChannelImplTest { oob.shutdown(); assertFalse(channelz.containsSubchannel(oob.getLogId())); assertThat(getStats(channel).subchannels).isEmpty(); - assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertFalse( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); // channel still appears assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); @@ -2571,7 +2580,7 @@ public class ManagedChannelImplTest { .setDescription("Child Subchannel started") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setTimestampNanos(timer.getTicker().read()) - .setSubchannelRef(subchannel.getInternalSubchannel()) + .setSubchannelRef(subchannel.getInstrumentedInternalSubchannel()) .build()); assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() .setDescription("Subchannel for [[[test-addr]/{}]] created") @@ -2742,7 +2751,7 @@ public class ManagedChannelImplTest { (AbstractSubchannel) createSubchannelSafely( helper, addressGroup, Attributes.EMPTY, subchannelStateListener); timer.forwardNanos(1234); - subchannel.obtainActiveTransport(); + ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport(); assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() .setDescription("CONNECTING as requested") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) @@ -3958,7 +3967,7 @@ public class ManagedChannelImplTest { } private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception { - return subchannel.getInternalSubchannel().getStats().get(); + return subchannel.getInstrumentedInternalSubchannel().getStats().get(); } private static ChannelStats getStats(