diff --git a/core/src/main/java/io/grpc/TransportManager.java b/core/src/main/java/io/grpc/TransportManager.java index 087b26266f..4c0ccd5fde 100644 --- a/core/src/main/java/io/grpc/TransportManager.java +++ b/core/src/main/java/io/grpc/TransportManager.java @@ -51,9 +51,6 @@ public abstract class TransportManager { * *

Never returns {@code null} */ - // TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers, - // which would have a different authority than the primary servers. We need to figure out how to - // do it. public abstract T getTransport(EquivalentAddressGroup addressGroup); /** @@ -74,6 +71,12 @@ public abstract class TransportManager { */ public abstract InterimTransport createInterimTransport(); + /** + * Creates an {@link OobTransportProvider} with a specific authority. + */ + public abstract OobTransportProvider createOobTransportProvider( + EquivalentAddressGroup addressGroup, String authority); + /** * Returns a channel that uses {@code transport}; useful for issuing RPCs on a transport. */ @@ -85,7 +88,7 @@ public abstract class TransportManager { * * @see #createInterimTransport */ - public static interface InterimTransport { + public interface InterimTransport { /** * Returns the transport object. * @@ -106,4 +109,21 @@ public abstract class TransportManager { */ void closeWithError(Status error); } + + /** + * A provider for out-of-band transports, usually used by a load-balancer that needs to + * communicate with an external load-balancing service which is under an authority different from + * what the channel is associated with. + */ + public interface OobTransportProvider { + /** + * Returns an OOB transport. + */ + T get(); + + /** + * Closes the provider and shuts down all associated transports. + */ + void close(); + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index f9ecc1a3fc..d0c70192fa 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -54,6 +54,7 @@ import io.grpc.ResolvedServerInfo; import io.grpc.Status; import io.grpc.TransportManager; import io.grpc.TransportManager.InterimTransport; +import io.grpc.TransportManager.OobTransportProvider; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import java.net.URI; @@ -126,6 +127,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI private final HashSet delayedTransports = new HashSet(); + @GuardedBy("lock") + private final HashSet> oobTransports = + new HashSet>(); + @GuardedBy("lock") private boolean shutdown; @GuardedBy("lock") @@ -271,6 +276,9 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI for (DelayedClientTransport transport : delayedTransportsCopy) { transport.shutdown(); } + for (OobTransportProvider provider : oobTransports) { + provider.close(); + } if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] Shutting down", getLogId()); } @@ -288,6 +296,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI @Override public ManagedChannelImpl shutdownNow() { shutdown(); + // TODO(zhangkun): also call shutdownNow() on oobTransports. return this; } @@ -364,7 +373,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI if (terminated) { return; } - if (shutdown && transports.isEmpty() && delayedTransports.isEmpty()) { + if (shutdown && transports.isEmpty() && delayedTransports.isEmpty() + && oobTransports.isEmpty()) { if (log.isLoggable(Level.INFO)) { log.log(Level.INFO, "[{0}] Terminated", getLogId()); } @@ -440,6 +450,12 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI public InterimTransport createInterimTransport() { return new InterimTransportImpl(); } + + @Override + public OobTransportProvider createOobTransportProvider( + EquivalentAddressGroup addressGroup, String authority) { + return new OobTransportProviderImpl(addressGroup, authority); + } }; @Override @@ -493,4 +509,45 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI delayedTransport.shutdownNow(error); } } + + private class OobTransportProviderImpl implements OobTransportProvider { + private final TransportSet transportSet; + + OobTransportProviderImpl(EquivalentAddressGroup addressGroup, String authority) { + synchronized (lock) { + if (shutdown) { + transportSet = null; + } else { + transportSet = new TransportSet(addressGroup, authority, userAgent, loadBalancer, + backoffPolicyProvider, transportFactory, scheduledExecutor, executor, + new TransportSet.Callback() { + @Override + public void onTerminated() { + synchronized (lock) { + oobTransports.remove(OobTransportProviderImpl.this); + maybeTerminateChannel(); + } + } + }); + oobTransports.add(this); + } + } + } + + @Override + public ClientTransport get() { + if (transportSet == null) { + return SHUTDOWN_TRANSPORT; + } else { + return transportSet.obtainActiveTransport(); + } + } + + @Override + public void close() { + if (transportSet != null) { + transportSet.shutdown(); + } + } + } } diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 989ad5ce13..3ce9630626 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -444,21 +444,21 @@ final class TransportSet implements WithLogId { } } - interface Callback { + abstract static class Callback { /** * Called when the TransportSet is terminated, which means it's shut down and all transports * have been terminated. */ - void onTerminated(); + public void onTerminated() { } /** * Called when all addresses have failed to connect. */ - void onAllAddressesFailed(); + public void onAllAddressesFailed() { } /** * Called when a once-live connection is shut down by server-side. */ - void onConnectionClosedByServer(Status status); + public void onConnectionClosedByServer(Status status) { } } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index 274b50da58..90091be5a3 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -33,6 +33,7 @@ package io.grpc.internal; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -57,6 +58,7 @@ import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StringMarshaller; import io.grpc.TransportManager.InterimTransport; +import io.grpc.TransportManager.OobTransportProvider; import io.grpc.TransportManager; import io.grpc.internal.TestUtils.MockClientTransportInfo; @@ -322,4 +324,50 @@ public class ManagedChannelImplTransportManagerTest { verify(sl1).closed(same(Status.UNAVAILABLE), any(Metadata.class)); assertTrue(channel.isTerminated()); } + + @Test + public void createOobTransportProvider() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr); + String oobAuthority = "oobauthority"; + + OobTransportProvider p1 = + tm.createOobTransportProvider(addressGroup, oobAuthority); + ClientTransport t1 = p1.get(); + assertNotNull(t1); + assertSame(t1, p1.get()); + verify(mockTransportFactory, timeout(1000)).newClientTransport(addr, oobAuthority, userAgent); + MockClientTransportInfo transportInfo1 = transports.poll(1, TimeUnit.SECONDS); + + // OOB transport providers are not indexed by addresses, thus each time it creates + // a new provider. + OobTransportProvider p2 = + tm.createOobTransportProvider(addressGroup, oobAuthority); + assertNotSame(p1, p2); + ClientTransport t2 = p2.get(); + verify(mockTransportFactory, timeout(1000).times(2)) + .newClientTransport(addr, oobAuthority, userAgent); + assertNotSame(t1, t2); + MockClientTransportInfo transportInfo2 = transports.poll(1, TimeUnit.SECONDS); + assertNotSame(transportInfo1.transport, transportInfo2.transport); + + // Closing the OobTransportProvider will shutdown the transport + p1.close(); + verify(transportInfo1.transport).shutdown(); + transportInfo1.listener.transportTerminated(); + + channel.shutdown(); + verify(transportInfo2.transport).shutdown(); + + OobTransportProvider p3 = + tm.createOobTransportProvider(addressGroup, oobAuthority); + assertTrue(p3.get() instanceof FailingClientTransport); + + p2.close(); + + // The channel will not be terminated until all OOB transports are terminated. + assertFalse(channel.isTerminated()); + transportInfo2.listener.transportTerminated(); + assertTrue(channel.isTerminated()); + } }