From f8fffeff120eca4d445d31000289ad0db83c99f3 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 22 May 2019 13:52:28 -0700 Subject: [PATCH] api: add Subchannel.getInternalSubchannel(). (#5773) Previously PickResult's Subchannel must be the actual implementation returned from the Channel's Helper, and Channel would cast it to the implementation class in order to use it. This will be broken if Subchannel is wrapped in the case of hierarchical LoadBalancers. getInternalSubchannel() is the guaranteed path for the Channel to get the InternalSubchannel implementation. It is friendly for wrapping. Background: #5676 --- api/src/main/java/io/grpc/LoadBalancer.java | 22 ++++++++++-- .../io/grpc/internal/AbstractSubchannel.java | 9 +---- .../main/java/io/grpc/internal/GrpcUtil.java | 2 +- .../io/grpc/internal/InternalSubchannel.java | 11 ++---- .../io/grpc/internal/ManagedChannelImpl.java | 16 ++++----- .../java/io/grpc/internal/OobChannel.java | 12 +++---- .../io/grpc/internal/TransportProvider.java | 34 +++++++++++++++++++ .../io/grpc/util/ForwardingSubchannel.java | 5 +++ .../internal/DelayedClientTransportTest.java | 23 +++++++++---- .../grpc/internal/ManagedChannelImplTest.java | 33 +++++++++++------- 10 files changed, 116 insertions(+), 51 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/TransportProvider.java diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index c1fff6908d..4e13d24099 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -490,8 +490,11 @@ public abstract class LoadBalancer { /** * A decision to proceed the RPC on a Subchannel. * - *

Only Subchannels returned by {@link Helper#createSubchannel Helper.createSubchannel()} - * will work. DO NOT try to use your own implementations of Subchannels, as they won't work. + *

The Subchannel should either be an original Subchannel returned by {@link + * Helper#createSubchannel Helper.createSubchannel()}, or a wrapper of it preferrably based on + * {@code ForwardingSubchannel}. At the very least its {@link Subchannel#getInternalSubchannel + * getInternalSubchannel()} must return the same object as the one returned by the original. + * Otherwise the Channel cannot use it for the RPC. * *

When the RPC tries to use the return Subchannel, which is briefly after this method * returns, the state of the Subchannel will decide where the RPC would go: @@ -1297,6 +1300,21 @@ public abstract class LoadBalancer { public ChannelLogger getChannelLogger() { throw new UnsupportedOperationException(); } + + /** + * (Internal use only) returns an object that represents the underlying subchannel that is used + * by the Channel for sending RPCs when this {@link Subchannel} is picked. This is an opaque + * object that is both provided and consumed by the Channel. Its type is not + * {@code Subchannel}. + * + *

Warning: this is INTERNAL API, is not supposed to be used by external users, and may + * change without notice. If you think you must use it, please file an issue and we can consider + * removing its "internal" status. + */ + @Internal + public Object getInternalSubchannel() { + throw new UnsupportedOperationException(); + } } /** diff --git a/core/src/main/java/io/grpc/internal/AbstractSubchannel.java b/core/src/main/java/io/grpc/internal/AbstractSubchannel.java index 2d36e7a2d8..cc4b3d9fb2 100644 --- a/core/src/main/java/io/grpc/internal/AbstractSubchannel.java +++ b/core/src/main/java/io/grpc/internal/AbstractSubchannel.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import io.grpc.InternalChannelz.ChannelStats; import io.grpc.InternalInstrumented; import io.grpc.LoadBalancer; -import javax.annotation.Nullable; /** * The base interface of the Subchannels returned by {@link @@ -28,16 +27,10 @@ import javax.annotation.Nullable; */ abstract class AbstractSubchannel extends LoadBalancer.Subchannel { - /** - * Same as {@link InternalSubchannel#obtainActiveTransport}. - */ - @Nullable - abstract ClientTransport obtainActiveTransport(); - /** * Returns the InternalSubchannel as an {@code Instrumented} for the sole purpose of channelz * unit tests. */ @VisibleForTesting - abstract InternalInstrumented getInternalSubchannel(); + abstract InternalInstrumented getInstrumentedInternalSubchannel(); } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 36a6245dda..47cba3af5c 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -689,7 +689,7 @@ public final class GrpcUtil { final ClientTransport transport; Subchannel subchannel = result.getSubchannel(); if (subchannel != null) { - transport = ((AbstractSubchannel) subchannel).obtainActiveTransport(); + transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport(); } else { transport = null; } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 2d60d15791..9c85aee70d 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -65,7 +65,7 @@ import javax.annotation.concurrent.ThreadSafe; * Transports for a single {@link SocketAddress}. */ @ThreadSafe -final class InternalSubchannel implements InternalInstrumented { +final class InternalSubchannel implements InternalInstrumented, TransportProvider { private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); private final InternalLogId logId; @@ -192,13 +192,8 @@ final class InternalSubchannel implements InternalInstrumented { return channelLogger; } - /** - * Returns a READY transport that will be used to create new streams. - * - *

Returns {@code null} if the state is not READY. Will try to connect if state is IDLE. - */ - @Nullable - ClientTransport obtainActiveTransport() { + @Override + public ClientTransport obtainActiveTransport() { ClientTransport savedTransport = activeTransport; if (savedTransport != null) { return savedTransport; diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9346a03dba..b75b9e305d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1138,7 +1138,7 @@ final class ManagedChannelImpl extends ManagedChannel implements checkArgument(subchannel instanceof SubchannelImpl, "subchannel must have been returned from createSubchannel"); logWarningIfNotInSyncContext("updateSubchannelAddresses()"); - ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); + ((InternalSubchannel) subchannel.getInternalSubchannel()).updateAddresses(addrs); } @Override @@ -1499,13 +1499,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } @Override - ClientTransport obtainActiveTransport() { - checkState(started, "Subchannel is not started"); - return subchannel.obtainActiveTransport(); - } - - @Override - InternalInstrumented getInternalSubchannel() { + InternalInstrumented getInstrumentedInternalSubchannel() { checkState(started, "not started"); return subchannel; } @@ -1602,6 +1596,12 @@ final class ManagedChannelImpl extends ManagedChannel implements callTracerFactory.create()); } + @Override + public Object getInternalSubchannel() { + checkState(started, "Subchannel is not started"); + return subchannel; + } + @Override public ChannelLogger getChannelLogger() { return subchannel.getChannelLogger(); diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index d1e3236126..512abaeed1 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -148,12 +148,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented getInternalSubchannel() { + InternalInstrumented getInstrumentedInternalSubchannel() { return subchannel; } @@ -171,6 +166,11 @@ final class OobChannel extends ManagedChannel implements InternalInstrumentedReturns {@code null} if the state is not READY. Will try to connect if state is IDLE. + */ + @Nullable + ClientTransport obtainActiveTransport(); +} diff --git a/core/src/main/java/io/grpc/util/ForwardingSubchannel.java b/core/src/main/java/io/grpc/util/ForwardingSubchannel.java index 5204032519..de9a511232 100644 --- a/core/src/main/java/io/grpc/util/ForwardingSubchannel.java +++ b/core/src/main/java/io/grpc/util/ForwardingSubchannel.java @@ -69,6 +69,11 @@ public abstract class ForwardingSubchannel extends LoadBalancer.Subchannel { return delegate().getChannelLogger(); } + @Override + public Object getInternalSubchannel() { + return delegate().getInternalSubchannel(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 57a38ebb82..0f51d3f62d 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -75,6 +75,7 @@ public class DelayedClientTransportTest { @Mock private ManagedClientTransport.Listener transportListener; @Mock private SubchannelPicker mockPicker; @Mock private AbstractSubchannel mockSubchannel; + @Mock private TransportProvider mockInternalSubchannel; @Mock private ClientTransport mockRealTransport; @Mock private ClientTransport mockRealTransport2; @Mock private ClientStream mockRealStream; @@ -119,7 +120,8 @@ public class DelayedClientTransportTest { @Before public void setUp() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)); - when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport); + when(mockSubchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel); + when(mockInternalSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport); when(mockRealTransport.newStream(same(method), same(headers), same(callOptions))) .thenReturn(mockRealStream); when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2))) @@ -308,9 +310,9 @@ public class DelayedClientTransportTest { any(CallOptions.class))).thenReturn(mockRealStream); when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))).thenReturn(mockRealStream2); - when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport); - when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2); - when(subchannel3.obtainActiveTransport()).thenReturn(null); + when(subchannel1.getInternalSubchannel()).thenReturn(newTransportProvider(mockRealTransport)); + when(subchannel2.getInternalSubchannel()).thenReturn(newTransportProvider(mockRealTransport2)); + when(subchannel3.getInternalSubchannel()).thenReturn(newTransportProvider(null)); // Fail-fast streams DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( @@ -465,7 +467,7 @@ public class DelayedClientTransportTest { public void reprocess_NoPendingStream() { SubchannelPicker picker = mock(SubchannelPicker.class); AbstractSubchannel subchannel = mock(AbstractSubchannel.class); - when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport); + when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel)); when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), @@ -477,7 +479,7 @@ public class DelayedClientTransportTest { // Though picker was not originally used, it will be saved and serve future streams. ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT); verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)); - verify(subchannel).obtainActiveTransport(); + verify(mockInternalSubchannel).obtainActiveTransport(); assertSame(mockRealStream, stream); } @@ -599,4 +601,13 @@ public class DelayedClientTransportTest { assertTrue(delayedTransport.hasPendingStreams()); verify(transportListener).transportInUse(true); } + + private static TransportProvider newTransportProvider(final ClientTransport transport) { + return new TransportProvider() { + @Override + public ClientTransport obtainActiveTransport() { + return transport; + } + }; + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index df61272afa..2af7397207 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -475,9 +475,12 @@ public class ManagedChannelImplTest { (AbstractSubchannel) createSubchannelSafely( helper, addressGroup, Attributes.EMPTY, subchannelStateListener); // subchannels are not root channels - assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId())); - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); - assertThat(getStats(channel).subchannels).containsExactly(subchannel.getInternalSubchannel()); + assertNull( + channelz.getRootChannel(subchannel.getInstrumentedInternalSubchannel().getLogId().getId())); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); + assertThat(getStats(channel).subchannels) + .containsExactly(subchannel.getInstrumentedInternalSubchannel()); requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); @@ -489,11 +492,13 @@ public class ManagedChannelImplTest { assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); // terminate subchannel - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); shutdownSafely(helper, subchannel); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); timer.runDueTasks(); - assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertFalse( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); assertThat(getStats(channel).subchannels).isEmpty(); // channel still appears @@ -511,9 +516,12 @@ public class ManagedChannelImplTest { assertTrue(channelz.containsSubchannel(oob.getLogId())); AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel(); - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); - assertThat(getStats(oob).subchannels).containsExactly(subchannel.getInternalSubchannel()); - assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); + assertThat(getStats(oob).subchannels) + .containsExactly(subchannel.getInstrumentedInternalSubchannel()); + assertTrue( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); oob.getSubchannel().requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); @@ -528,7 +536,8 @@ public class ManagedChannelImplTest { oob.shutdown(); assertFalse(channelz.containsSubchannel(oob.getLogId())); assertThat(getStats(channel).subchannels).isEmpty(); - assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); + assertFalse( + channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); // channel still appears assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); @@ -2571,7 +2580,7 @@ public class ManagedChannelImplTest { .setDescription("Child Subchannel started") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setTimestampNanos(timer.getTicker().read()) - .setSubchannelRef(subchannel.getInternalSubchannel()) + .setSubchannelRef(subchannel.getInstrumentedInternalSubchannel()) .build()); assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() .setDescription("Subchannel for [[[test-addr]/{}]] created") @@ -2742,7 +2751,7 @@ public class ManagedChannelImplTest { (AbstractSubchannel) createSubchannelSafely( helper, addressGroup, Attributes.EMPTY, subchannelStateListener); timer.forwardNanos(1234); - subchannel.obtainActiveTransport(); + ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport(); assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() .setDescription("CONNECTING as requested") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) @@ -3958,7 +3967,7 @@ public class ManagedChannelImplTest { } private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception { - return subchannel.getInternalSubchannel().getStats().get(); + return subchannel.getInstrumentedInternalSubchannel().getStats().get(); } private static ChannelStats getStats(