diff --git a/core/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java b/core/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java index 7e3578a230..0b890651db 100644 --- a/core/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java +++ b/core/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java @@ -123,6 +123,7 @@ public final class TransmitStatusRuntimeExceptionInterceptor implements ServerIn private static final String ERROR_MSG = "Encountered error during serialized access"; private final SerializingExecutor serializingExecutor = new SerializingExecutor(MoreExecutors.directExecutor()); + private boolean closeCalled = false; SerializingServerCall(ServerCall delegate) { super(delegate); @@ -163,7 +164,11 @@ public final class TransmitStatusRuntimeExceptionInterceptor implements ServerIn serializingExecutor.execute(new Runnable() { @Override public void run() { - SerializingServerCall.super.close(status, trailers); + if (!closeCalled) { + closeCalled = true; + + SerializingServerCall.super.close(status, trailers); + } } }); } diff --git a/core/src/test/java/io/grpc/util/UtilServerInterceptorsTest.java b/core/src/test/java/io/grpc/util/UtilServerInterceptorsTest.java index 7bd38ba8da..37433b8540 100644 --- a/core/src/test/java/io/grpc/util/UtilServerInterceptorsTest.java +++ b/core/src/test/java/io/grpc/util/UtilServerInterceptorsTest.java @@ -42,12 +42,17 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class UtilServerInterceptorsTest { + private static class VoidCallListener extends ServerCall.Listener { + public void onCall(ServerCall call, Metadata headers) { } + } + private MethodDescriptor flowMethod = TestMethodDescriptors.voidMethod(); private final Metadata headers = new Metadata(); private ServerCallHandler handler = new ServerCallHandler() { @Override public ServerCall.Listener startCall( ServerCall call, Metadata headers) { + listener.onCall(call, headers); return listener; } }; @@ -55,7 +60,7 @@ public class UtilServerInterceptorsTest { ServerServiceDefinition.builder(new ServiceDescriptor("service_foo", flowMethod)) .addMethod(flowMethod, handler) .build(); - private ServerCall.Listener listener; + private VoidCallListener listener; @SuppressWarnings("unchecked") private static ServerMethodDefinition getSoleMethod( @@ -74,7 +79,7 @@ public class UtilServerInterceptorsTest { new FakeServerCall(expectedStatus, expectedMetadata); final StatusRuntimeException exception = new StatusRuntimeException(expectedStatus, expectedMetadata); - listener = new ServerCall.Listener() { + listener = new VoidCallListener() { @Override public void onMessage(Void message) { throw exception; @@ -114,6 +119,57 @@ public class UtilServerInterceptorsTest { assertEquals(5, call.numCloses); } + @Test + public void statusRuntimeExceptionTransmitterIgnoresClosedCalls() { + final Status expectedStatus = Status.UNAVAILABLE; + final Status unexpectedStatus = Status.CANCELLED; + final Metadata expectedMetadata = new Metadata(); + + FakeServerCall call = + new FakeServerCall(expectedStatus, expectedMetadata); + final StatusRuntimeException exception = + new StatusRuntimeException(expectedStatus, expectedMetadata); + + listener = new VoidCallListener() { + @Override + public void onMessage(Void message) { + throw exception; + } + + @Override + public void onHalfClose() { + throw exception; + } + }; + + ServerServiceDefinition intercepted = ServerInterceptors.intercept( + serviceDefinition, + Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance())); + ServerCall.Listener callDoubleSreListener = + getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers); + callDoubleSreListener.onMessage(null); // the only close with our exception + callDoubleSreListener.onHalfClose(); // should not trigger a close + + // this listener closes the call when it is initialized with startCall + listener = new VoidCallListener() { + @Override + public void onCall(ServerCall call, Metadata headers) { + call.close(unexpectedStatus, headers); + } + + @Override + public void onHalfClose() { + throw exception; + } + }; + + ServerCall.Listener callClosedListener = + getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers); + // call is already closed, does not match exception + callClosedListener.onHalfClose(); // should not trigger a close + assertEquals(1, call.numCloses); + } + private static class FakeServerCall extends NoopServerCall { final Status expectedStatus; final Metadata expectedMetadata;