mirror of https://github.com/grpc/grpc-java.git
core: Don't close in TSREI in cancelled contexts (#4596)
Prevent multiple effective close calls either by successful completion of a cancel or complete notification, or through successive exceptions handled within a single call.
This commit is contained in:
parent
e1865b565d
commit
19b2a17801
|
|
@ -123,6 +123,7 @@ public final class TransmitStatusRuntimeExceptionInterceptor implements ServerIn
|
||||||
private static final String ERROR_MSG = "Encountered error during serialized access";
|
private static final String ERROR_MSG = "Encountered error during serialized access";
|
||||||
private final SerializingExecutor serializingExecutor =
|
private final SerializingExecutor serializingExecutor =
|
||||||
new SerializingExecutor(MoreExecutors.directExecutor());
|
new SerializingExecutor(MoreExecutors.directExecutor());
|
||||||
|
private boolean closeCalled = false;
|
||||||
|
|
||||||
SerializingServerCall(ServerCall<ReqT, RespT> delegate) {
|
SerializingServerCall(ServerCall<ReqT, RespT> delegate) {
|
||||||
super(delegate);
|
super(delegate);
|
||||||
|
|
@ -163,8 +164,12 @@ public final class TransmitStatusRuntimeExceptionInterceptor implements ServerIn
|
||||||
serializingExecutor.execute(new Runnable() {
|
serializingExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
if (!closeCalled) {
|
||||||
|
closeCalled = true;
|
||||||
|
|
||||||
SerializingServerCall.super.close(status, trailers);
|
SerializingServerCall.super.close(status, trailers);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -42,12 +42,17 @@ import org.junit.runners.JUnit4;
|
||||||
*/
|
*/
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
public class UtilServerInterceptorsTest {
|
public class UtilServerInterceptorsTest {
|
||||||
|
private static class VoidCallListener extends ServerCall.Listener<Void> {
|
||||||
|
public void onCall(ServerCall<Void, Void> call, Metadata headers) { }
|
||||||
|
}
|
||||||
|
|
||||||
private MethodDescriptor<Void, Void> flowMethod = TestMethodDescriptors.voidMethod();
|
private MethodDescriptor<Void, Void> flowMethod = TestMethodDescriptors.voidMethod();
|
||||||
private final Metadata headers = new Metadata();
|
private final Metadata headers = new Metadata();
|
||||||
private ServerCallHandler<Void, Void> handler = new ServerCallHandler<Void, Void>() {
|
private ServerCallHandler<Void, Void> handler = new ServerCallHandler<Void, Void>() {
|
||||||
@Override
|
@Override
|
||||||
public ServerCall.Listener<Void> startCall(
|
public ServerCall.Listener<Void> startCall(
|
||||||
ServerCall<Void, Void> call, Metadata headers) {
|
ServerCall<Void, Void> call, Metadata headers) {
|
||||||
|
listener.onCall(call, headers);
|
||||||
return listener;
|
return listener;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -55,7 +60,7 @@ public class UtilServerInterceptorsTest {
|
||||||
ServerServiceDefinition.builder(new ServiceDescriptor("service_foo", flowMethod))
|
ServerServiceDefinition.builder(new ServiceDescriptor("service_foo", flowMethod))
|
||||||
.addMethod(flowMethod, handler)
|
.addMethod(flowMethod, handler)
|
||||||
.build();
|
.build();
|
||||||
private ServerCall.Listener<Void> listener;
|
private VoidCallListener listener;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private static ServerMethodDefinition<Void, Void> getSoleMethod(
|
private static ServerMethodDefinition<Void, Void> getSoleMethod(
|
||||||
|
|
@ -74,7 +79,7 @@ public class UtilServerInterceptorsTest {
|
||||||
new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata);
|
new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata);
|
||||||
final StatusRuntimeException exception =
|
final StatusRuntimeException exception =
|
||||||
new StatusRuntimeException(expectedStatus, expectedMetadata);
|
new StatusRuntimeException(expectedStatus, expectedMetadata);
|
||||||
listener = new ServerCall.Listener<Void>() {
|
listener = new VoidCallListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Void message) {
|
public void onMessage(Void message) {
|
||||||
throw exception;
|
throw exception;
|
||||||
|
|
@ -114,6 +119,57 @@ public class UtilServerInterceptorsTest {
|
||||||
assertEquals(5, call.numCloses);
|
assertEquals(5, call.numCloses);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void statusRuntimeExceptionTransmitterIgnoresClosedCalls() {
|
||||||
|
final Status expectedStatus = Status.UNAVAILABLE;
|
||||||
|
final Status unexpectedStatus = Status.CANCELLED;
|
||||||
|
final Metadata expectedMetadata = new Metadata();
|
||||||
|
|
||||||
|
FakeServerCall<Void, Void> call =
|
||||||
|
new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata);
|
||||||
|
final StatusRuntimeException exception =
|
||||||
|
new StatusRuntimeException(expectedStatus, expectedMetadata);
|
||||||
|
|
||||||
|
listener = new VoidCallListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Void message) {
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onHalfClose() {
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
|
||||||
|
serviceDefinition,
|
||||||
|
Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance()));
|
||||||
|
ServerCall.Listener<Void> callDoubleSreListener =
|
||||||
|
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
|
||||||
|
callDoubleSreListener.onMessage(null); // the only close with our exception
|
||||||
|
callDoubleSreListener.onHalfClose(); // should not trigger a close
|
||||||
|
|
||||||
|
// this listener closes the call when it is initialized with startCall
|
||||||
|
listener = new VoidCallListener() {
|
||||||
|
@Override
|
||||||
|
public void onCall(ServerCall<Void, Void> call, Metadata headers) {
|
||||||
|
call.close(unexpectedStatus, headers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onHalfClose() {
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ServerCall.Listener<Void> callClosedListener =
|
||||||
|
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
|
||||||
|
// call is already closed, does not match exception
|
||||||
|
callClosedListener.onHalfClose(); // should not trigger a close
|
||||||
|
assertEquals(1, call.numCloses);
|
||||||
|
}
|
||||||
|
|
||||||
private static class FakeServerCall<ReqT, RespT> extends NoopServerCall<ReqT, RespT> {
|
private static class FakeServerCall<ReqT, RespT> extends NoopServerCall<ReqT, RespT> {
|
||||||
final Status expectedStatus;
|
final Status expectedStatus;
|
||||||
final Metadata expectedMetadata;
|
final Metadata expectedMetadata;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue