Differentiate transport to LB services and to servers.

TransportManager has a new method, createOobTransportProvider(), which
accepts an EquivalentAddressGroup and the authority string. This
addresses two requirements:

1. Per GRPCLB protocol, connections to the remote load-balancer may use a
different authority than the channel's (#1137).

2. For idle state determination, Channel needs to exclude the transport to
the LB service when looking at live RPCs and (#1276).
This commit is contained in:
Kun Zhang 2016-05-19 14:46:58 -07:00
parent 676bf4854f
commit 90506405c9
4 changed files with 134 additions and 9 deletions

View File

@ -51,9 +51,6 @@ public abstract class TransportManager<T> {
* *
* <p>Never returns {@code null} * <p>Never returns {@code null}
*/ */
// TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers,
// which would have a different authority than the primary servers. We need to figure out how to
// do it.
public abstract T getTransport(EquivalentAddressGroup addressGroup); public abstract T getTransport(EquivalentAddressGroup addressGroup);
/** /**
@ -74,6 +71,12 @@ public abstract class TransportManager<T> {
*/ */
public abstract InterimTransport<T> createInterimTransport(); public abstract InterimTransport<T> createInterimTransport();
/**
* Creates an {@link OobTransportProvider} with a specific authority.
*/
public abstract OobTransportProvider<T> createOobTransportProvider(
EquivalentAddressGroup addressGroup, String authority);
/** /**
* Returns a channel that uses {@code transport}; useful for issuing RPCs on a transport. * Returns a channel that uses {@code transport}; useful for issuing RPCs on a transport.
*/ */
@ -85,7 +88,7 @@ public abstract class TransportManager<T> {
* *
* @see #createInterimTransport * @see #createInterimTransport
*/ */
public static interface InterimTransport<T> { public interface InterimTransport<T> {
/** /**
* Returns the transport object. * Returns the transport object.
* *
@ -106,4 +109,21 @@ public abstract class TransportManager<T> {
*/ */
void closeWithError(Status error); void closeWithError(Status error);
} }
/**
* A provider for out-of-band transports, usually used by a load-balancer that needs to
* communicate with an external load-balancing service which is under an authority different from
* what the channel is associated with.
*/
public interface OobTransportProvider<T> {
/**
* Returns an OOB transport.
*/
T get();
/**
* Closes the provider and shuts down all associated transports.
*/
void close();
}
} }

View File

@ -54,6 +54,7 @@ import io.grpc.ResolvedServerInfo;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
import io.grpc.TransportManager.OobTransportProvider;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import java.net.URI; import java.net.URI;
@ -126,6 +127,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
private final HashSet<DelayedClientTransport> delayedTransports = private final HashSet<DelayedClientTransport> delayedTransports =
new HashSet<DelayedClientTransport>(); new HashSet<DelayedClientTransport>();
@GuardedBy("lock")
private final HashSet<OobTransportProvider<ClientTransport>> oobTransports =
new HashSet<OobTransportProvider<ClientTransport>>();
@GuardedBy("lock") @GuardedBy("lock")
private boolean shutdown; private boolean shutdown;
@GuardedBy("lock") @GuardedBy("lock")
@ -271,6 +276,9 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
for (DelayedClientTransport transport : delayedTransportsCopy) { for (DelayedClientTransport transport : delayedTransportsCopy) {
transport.shutdown(); transport.shutdown();
} }
for (OobTransportProvider<ClientTransport> provider : oobTransports) {
provider.close();
}
if (log.isLoggable(Level.FINE)) { if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Shutting down", getLogId()); log.log(Level.FINE, "[{0}] Shutting down", getLogId());
} }
@ -288,6 +296,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
@Override @Override
public ManagedChannelImpl shutdownNow() { public ManagedChannelImpl shutdownNow() {
shutdown(); shutdown();
// TODO(zhangkun): also call shutdownNow() on oobTransports.
return this; return this;
} }
@ -364,7 +373,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
if (terminated) { if (terminated) {
return; return;
} }
if (shutdown && transports.isEmpty() && delayedTransports.isEmpty()) { if (shutdown && transports.isEmpty() && delayedTransports.isEmpty()
&& oobTransports.isEmpty()) {
if (log.isLoggable(Level.INFO)) { if (log.isLoggable(Level.INFO)) {
log.log(Level.INFO, "[{0}] Terminated", getLogId()); log.log(Level.INFO, "[{0}] Terminated", getLogId());
} }
@ -440,6 +450,12 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
public InterimTransport<ClientTransport> createInterimTransport() { public InterimTransport<ClientTransport> createInterimTransport() {
return new InterimTransportImpl(); return new InterimTransportImpl();
} }
@Override
public OobTransportProvider<ClientTransport> createOobTransportProvider(
EquivalentAddressGroup addressGroup, String authority) {
return new OobTransportProviderImpl(addressGroup, authority);
}
}; };
@Override @Override
@ -493,4 +509,45 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
delayedTransport.shutdownNow(error); delayedTransport.shutdownNow(error);
} }
} }
private class OobTransportProviderImpl implements OobTransportProvider<ClientTransport> {
private final TransportSet transportSet;
OobTransportProviderImpl(EquivalentAddressGroup addressGroup, String authority) {
synchronized (lock) {
if (shutdown) {
transportSet = null;
} else {
transportSet = new TransportSet(addressGroup, authority, userAgent, loadBalancer,
backoffPolicyProvider, transportFactory, scheduledExecutor, executor,
new TransportSet.Callback() {
@Override
public void onTerminated() {
synchronized (lock) {
oobTransports.remove(OobTransportProviderImpl.this);
maybeTerminateChannel();
}
}
});
oobTransports.add(this);
}
}
}
@Override
public ClientTransport get() {
if (transportSet == null) {
return SHUTDOWN_TRANSPORT;
} else {
return transportSet.obtainActiveTransport();
}
}
@Override
public void close() {
if (transportSet != null) {
transportSet.shutdown();
}
}
}
} }

View File

@ -444,21 +444,21 @@ final class TransportSet implements WithLogId {
} }
} }
interface Callback { abstract static class Callback {
/** /**
* Called when the TransportSet is terminated, which means it's shut down and all transports * Called when the TransportSet is terminated, which means it's shut down and all transports
* have been terminated. * have been terminated.
*/ */
void onTerminated(); public void onTerminated() { }
/** /**
* Called when all addresses have failed to connect. * Called when all addresses have failed to connect.
*/ */
void onAllAddressesFailed(); public void onAllAddressesFailed() { }
/** /**
* Called when a once-live connection is shut down by server-side. * Called when a once-live connection is shut down by server-side.
*/ */
void onConnectionClosedByServer(Status status); public void onConnectionClosedByServer(Status status) { }
} }
} }

View File

@ -33,6 +33,7 @@ package io.grpc.internal;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -57,6 +58,7 @@ import io.grpc.NameResolver;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StringMarshaller; import io.grpc.StringMarshaller;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
import io.grpc.TransportManager.OobTransportProvider;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import io.grpc.internal.TestUtils.MockClientTransportInfo; import io.grpc.internal.TestUtils.MockClientTransportInfo;
@ -322,4 +324,50 @@ public class ManagedChannelImplTransportManagerTest {
verify(sl1).closed(same(Status.UNAVAILABLE), any(Metadata.class)); verify(sl1).closed(same(Status.UNAVAILABLE), any(Metadata.class));
assertTrue(channel.isTerminated()); assertTrue(channel.isTerminated());
} }
@Test
public void createOobTransportProvider() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr);
String oobAuthority = "oobauthority";
OobTransportProvider<ClientTransport> p1 =
tm.createOobTransportProvider(addressGroup, oobAuthority);
ClientTransport t1 = p1.get();
assertNotNull(t1);
assertSame(t1, p1.get());
verify(mockTransportFactory, timeout(1000)).newClientTransport(addr, oobAuthority, userAgent);
MockClientTransportInfo transportInfo1 = transports.poll(1, TimeUnit.SECONDS);
// OOB transport providers are not indexed by addresses, thus each time it creates
// a new provider.
OobTransportProvider<ClientTransport> p2 =
tm.createOobTransportProvider(addressGroup, oobAuthority);
assertNotSame(p1, p2);
ClientTransport t2 = p2.get();
verify(mockTransportFactory, timeout(1000).times(2))
.newClientTransport(addr, oobAuthority, userAgent);
assertNotSame(t1, t2);
MockClientTransportInfo transportInfo2 = transports.poll(1, TimeUnit.SECONDS);
assertNotSame(transportInfo1.transport, transportInfo2.transport);
// Closing the OobTransportProvider will shutdown the transport
p1.close();
verify(transportInfo1.transport).shutdown();
transportInfo1.listener.transportTerminated();
channel.shutdown();
verify(transportInfo2.transport).shutdown();
OobTransportProvider<ClientTransport> p3 =
tm.createOobTransportProvider(addressGroup, oobAuthority);
assertTrue(p3.get() instanceof FailingClientTransport);
p2.close();
// The channel will not be terminated until all OOB transports are terminated.
assertFalse(channel.isTerminated());
transportInfo2.listener.transportTerminated();
assertTrue(channel.isTerminated());
}
} }