mirror of https://github.com/grpc/grpc-java.git
core: fix terminated status check in TransportListener (#5955)
This commit is contained in:
parent
6615f2fc9d
commit
d974bea4b1
|
|
@ -494,6 +494,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
private class TransportListener implements ManagedClientTransport.Listener {
|
private class TransportListener implements ManagedClientTransport.Listener {
|
||||||
final ConnectionClientTransport transport;
|
final ConnectionClientTransport transport;
|
||||||
final SocketAddress address;
|
final SocketAddress address;
|
||||||
|
boolean shutdownInitiated = false;
|
||||||
|
|
||||||
TransportListener(ConnectionClientTransport transport, SocketAddress address) {
|
TransportListener(ConnectionClientTransport transport, SocketAddress address) {
|
||||||
this.transport = transport;
|
this.transport = transport;
|
||||||
|
|
@ -530,6 +531,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
public void transportShutdown(final Status s) {
|
public void transportShutdown(final Status s) {
|
||||||
channelLogger.log(
|
channelLogger.log(
|
||||||
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
|
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
|
||||||
|
shutdownInitiated = true;
|
||||||
syncContext.execute(new Runnable() {
|
syncContext.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
@ -561,6 +563,9 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transportTerminated() {
|
public void transportTerminated() {
|
||||||
|
Preconditions.checkState(
|
||||||
|
shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
|
||||||
|
|
||||||
channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
|
channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
|
||||||
channelz.removeClientSocket(transport);
|
channelz.removeClientSocket(transport);
|
||||||
handleTransportInUseState(transport, false);
|
handleTransportInUseState(transport, false);
|
||||||
|
|
@ -573,9 +578,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Preconditions.checkState(activeTransport != transport,
|
|
||||||
"activeTransport still points to this transport. "
|
|
||||||
+ "Seems transportShutdown() was not called.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -839,6 +839,7 @@ public class InternalSubchannelTest {
|
||||||
internalSubchannel.shutdown(SHUTDOWN_REASON);
|
internalSubchannel.shutdown(SHUTDOWN_REASON);
|
||||||
verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
|
verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
|
||||||
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
|
assertExactCallbackInvokes("onStateChange:SHUTDOWN");
|
||||||
|
transportInfo.listener.transportShutdown(SHUTDOWN_REASON);
|
||||||
|
|
||||||
transportInfo.listener.transportTerminated();
|
transportInfo.listener.transportTerminated();
|
||||||
assertExactCallbackInvokes("onTerminated");
|
assertExactCallbackInvokes("onTerminated");
|
||||||
|
|
@ -1144,7 +1145,9 @@ public class InternalSubchannelTest {
|
||||||
internalSubchannel.obtainActiveTransport();
|
internalSubchannel.obtainActiveTransport();
|
||||||
|
|
||||||
MockClientTransportInfo t0 = transports.poll();
|
MockClientTransportInfo t0 = transports.poll();
|
||||||
|
t0.listener.transportReady();
|
||||||
assertTrue(channelz.containsClientSocket(t0.transport.getLogId()));
|
assertTrue(channelz.containsClientSocket(t0.transport.getLogId()));
|
||||||
|
t0.listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
|
||||||
t0.listener.transportTerminated();
|
t0.listener.transportTerminated();
|
||||||
assertFalse(channelz.containsClientSocket(t0.transport.getLogId()));
|
assertFalse(channelz.containsClientSocket(t0.transport.getLogId()));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -521,8 +521,10 @@ public class ManagedChannelImplTest {
|
||||||
MockClientTransportInfo transportInfo = transports.poll();
|
MockClientTransportInfo transportInfo = transports.poll();
|
||||||
assertNotNull(transportInfo);
|
assertNotNull(transportInfo);
|
||||||
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
||||||
|
transportInfo.listener.transportReady();
|
||||||
|
|
||||||
// terminate transport
|
// terminate transport
|
||||||
|
transportInfo.listener.transportShutdown(Status.CANCELLED);
|
||||||
transportInfo.listener.transportTerminated();
|
transportInfo.listener.transportTerminated();
|
||||||
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
||||||
|
|
||||||
|
|
@ -564,6 +566,7 @@ public class ManagedChannelImplTest {
|
||||||
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
||||||
|
|
||||||
// terminate transport
|
// terminate transport
|
||||||
|
transportInfo.listener.transportShutdown(Status.INTERNAL);
|
||||||
transportInfo.listener.transportTerminated();
|
transportInfo.listener.transportTerminated();
|
||||||
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
|
||||||
|
|
||||||
|
|
@ -3270,6 +3273,7 @@ public class ManagedChannelImplTest {
|
||||||
verify(mockLoadBalancer).shutdown();
|
verify(mockLoadBalancer).shutdown();
|
||||||
// simulating the shutdown of load balancer triggers the shutdown of subchannel
|
// simulating the shutdown of load balancer triggers the shutdown of subchannel
|
||||||
shutdownSafely(helper, subchannel);
|
shutdownSafely(helper, subchannel);
|
||||||
|
transportInfo.listener.transportShutdown(Status.INTERNAL);
|
||||||
transportInfo.listener.transportTerminated(); // simulating transport terminated
|
transportInfo.listener.transportTerminated(); // simulating transport terminated
|
||||||
assertTrue(
|
assertTrue(
|
||||||
"channel.isTerminated() is expected to be true but was false",
|
"channel.isTerminated() is expected to be true but was false",
|
||||||
|
|
@ -3369,7 +3373,9 @@ public class ManagedChannelImplTest {
|
||||||
verify(mockLoadBalancer).shutdown();
|
verify(mockLoadBalancer).shutdown();
|
||||||
// simulating the shutdown of load balancer triggers the shutdown of subchannel
|
// simulating the shutdown of load balancer triggers the shutdown of subchannel
|
||||||
shutdownSafely(helper, subchannel);
|
shutdownSafely(helper, subchannel);
|
||||||
transportInfo.listener.transportTerminated(); // simulating transport terminated
|
// simulating transport shutdown & terminated
|
||||||
|
transportInfo.listener.transportShutdown(Status.INTERNAL);
|
||||||
|
transportInfo.listener.transportTerminated();
|
||||||
assertTrue(
|
assertTrue(
|
||||||
"channel.isTerminated() is expected to be true but was false",
|
"channel.isTerminated() is expected to be true but was false",
|
||||||
channel.isTerminated());
|
channel.isTerminated());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue