mirror of https://github.com/grpc/grpc-java.git
core: fix a discrepency in state transition.
Channel state API doesn't allow a TRANSIENT_FAILURE->IDLE edge. Change TransportSet to always transition to CONNECTING after TRANSIENT_FAILURE. This behavior, combined with that it never uses IDLE_TIMEOUT to transition from READY to IDLE, effectivly makes TransportSet Channel-state API-compliant under an infinite IDLE_TIMEOUT. Also set the default IDLE_TIMEOUT to 30min.
This commit is contained in:
parent
107fa8e801
commit
d74091f567
|
|
@ -77,6 +77,12 @@ public abstract class AbstractManagedChannelImplBuilder
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final long IDLE_MODE_MAX_TIMEOUT_DAYS = 30;
|
static final long IDLE_MODE_MAX_TIMEOUT_DAYS = 30;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default idle timeout.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long IDLE_MODE_DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An idle timeout smaller than this would be capped to it.
|
* An idle timeout smaller than this would be capped to it.
|
||||||
*/
|
*/
|
||||||
|
|
@ -110,7 +116,7 @@ public abstract class AbstractManagedChannelImplBuilder
|
||||||
@Nullable
|
@Nullable
|
||||||
private CompressorRegistry compressorRegistry;
|
private CompressorRegistry compressorRegistry;
|
||||||
|
|
||||||
private long idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
|
private long idleTimeoutMillis = IDLE_MODE_DEFAULT_TIMEOUT_MILLIS;
|
||||||
|
|
||||||
protected AbstractManagedChannelImplBuilder(String target) {
|
protected AbstractManagedChannelImplBuilder(String target) {
|
||||||
this.target = Preconditions.checkNotNull(target, "target");
|
this.target = Preconditions.checkNotNull(target, "target");
|
||||||
|
|
|
||||||
|
|
@ -252,42 +252,17 @@ final class TransportSet extends ManagedChannel implements WithLogId {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
delayedTransport.endBackoff();
|
delayedTransport.endBackoff();
|
||||||
boolean shutdownDelayedTransport = false;
|
|
||||||
Runnable runnable = null;
|
Runnable runnable = null;
|
||||||
// TransportSet as a channel layer class should not call into transport methods while
|
|
||||||
// holding the lock, thus we call hasPendingStreams() outside of the lock. It will cause
|
|
||||||
// a _benign_ race where the TransportSet may transition to CONNECTING when there is not
|
|
||||||
// pending stream.
|
|
||||||
boolean hasPendingStreams = delayedTransport.hasPendingStreams();
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
reconnectTask = null;
|
reconnectTask = null;
|
||||||
if (hasPendingStreams) {
|
|
||||||
if (!shutdown) {
|
if (!shutdown) {
|
||||||
stateManager.gotoState(ConnectivityState.CONNECTING);
|
stateManager.gotoState(ConnectivityState.CONNECTING);
|
||||||
}
|
}
|
||||||
runnable = startNewTransport(delayedTransport);
|
runnable = startNewTransport(delayedTransport);
|
||||||
} else {
|
|
||||||
if (!shutdown) {
|
|
||||||
stateManager.gotoState(ConnectivityState.IDLE);
|
|
||||||
}
|
|
||||||
activeTransport = null;
|
|
||||||
shutdownDelayedTransport = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (runnable != null) {
|
if (runnable != null) {
|
||||||
runnable.run();
|
runnable.run();
|
||||||
}
|
}
|
||||||
if (shutdownDelayedTransport) {
|
|
||||||
delayedTransport.setTransportSupplier(new Supplier<ClientTransport>() {
|
|
||||||
@Override
|
|
||||||
public ClientTransport get() {
|
|
||||||
// This will wrap one DelayedStream in another, but it only happens if we win a
|
|
||||||
// race and can happen to a stream at most once.
|
|
||||||
return obtainActiveTransport();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
delayedTransport.shutdown();
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.log(Level.WARNING, "Exception handling end of backoff", t);
|
log.log(Level.WARNING, "Exception handling end of backoff", t);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,8 @@ public class AbstractManagedChannelImplBuilderTest {
|
||||||
|
|
||||||
Builder builder = new Builder();
|
Builder builder = new Builder();
|
||||||
|
|
||||||
assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
|
assertEquals(AbstractManagedChannelImplBuilder.IDLE_MODE_DEFAULT_TIMEOUT_MILLIS,
|
||||||
|
builder.getIdleTimeoutMillis());
|
||||||
|
|
||||||
builder.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS);
|
builder.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||||
assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
|
assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
|
||||||
|
|
|
||||||
|
|
@ -473,68 +473,27 @@ public class TransportSetTest {
|
||||||
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
|
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
|
||||||
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
|
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
|
||||||
|
|
||||||
// Won't reconnect until requested, even if back-off time has expired
|
// Will always reconnect after back-off
|
||||||
fakeClock.forwardMillis(10);
|
fakeClock.forwardMillis(10);
|
||||||
assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
|
|
||||||
verify(mockTransportFactory, times(transportsCreated))
|
|
||||||
.newClientTransport(addr, authority, userAgent);
|
|
||||||
|
|
||||||
// Once requested, will reconnect
|
|
||||||
transportSet.obtainActiveTransport().newStream(method, new Metadata());
|
|
||||||
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
|
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
|
||||||
verify(mockTransportFactory, times(++transportsCreated))
|
verify(mockTransportFactory, times(++transportsCreated))
|
||||||
.newClientTransport(addr, authority, userAgent);
|
.newClientTransport(addr, authority, userAgent);
|
||||||
|
|
||||||
// Fail this one, too
|
// Make this one proceed
|
||||||
|
transports.peek().listener.transportReady();
|
||||||
|
assertEquals(ConnectivityState.READY, transportSet.getState(false));
|
||||||
|
// Then go-away
|
||||||
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
|
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
|
||||||
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
|
assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
|
||||||
|
|
||||||
// Request immediately, but will wait for back-off before reconnecting
|
// Request immediately
|
||||||
transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions);
|
transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions);
|
||||||
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
|
|
||||||
verify(mockTransportFactory, times(transportsCreated))
|
|
||||||
.newClientTransport(addr, authority, userAgent);
|
|
||||||
fakeClock.forwardMillis(100);
|
|
||||||
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
|
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
|
||||||
verify(mockTransportFactory, times(++transportsCreated))
|
verify(mockTransportFactory, times(++transportsCreated))
|
||||||
.newClientTransport(addr, authority, userAgent);
|
.newClientTransport(addr, authority, userAgent);
|
||||||
fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
|
fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void raceTransientFailureAndNewStream() {
|
|
||||||
SocketAddress addr = mock(SocketAddress.class);
|
|
||||||
createTransportSet(addr);
|
|
||||||
|
|
||||||
// Invocation counters
|
|
||||||
int transportsCreated = 0;
|
|
||||||
|
|
||||||
// Trigger TRANSIENT_FAILURE
|
|
||||||
transportSet.obtainActiveTransport().newStream(method, new Metadata());
|
|
||||||
verify(mockTransportFactory, times(++transportsCreated))
|
|
||||||
.newClientTransport(addr, authority, userAgent);
|
|
||||||
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
|
|
||||||
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
|
|
||||||
|
|
||||||
// Won't reconnect without any active streams
|
|
||||||
ClientTransport transientFailureTransport = transportSet.obtainActiveTransport();
|
|
||||||
assertTrue(transientFailureTransport instanceof DelayedClientTransport);
|
|
||||||
transientFailureTransport.newStream(method, new Metadata()).cancel(Status.CANCELLED);
|
|
||||||
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
|
|
||||||
fakeClock.forwardMillis(10);
|
|
||||||
assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
|
|
||||||
verify(mockTransportFactory, times(transportsCreated))
|
|
||||||
.newClientTransport(addr, authority, userAgent);
|
|
||||||
|
|
||||||
// Lose race (long delay between obtainActiveTransport and newStream); will now reconnect
|
|
||||||
transientFailureTransport.newStream(method, new Metadata());
|
|
||||||
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
|
|
||||||
verify(mockTransportFactory, times(++transportsCreated))
|
|
||||||
.newClientTransport(addr, authority, userAgent);
|
|
||||||
|
|
||||||
fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
|
public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
|
||||||
SocketAddress addr = mock(SocketAddress.class);
|
SocketAddress addr = mock(SocketAddress.class);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue