core: change serverimpl,servercallimpl's internalclose to cancel stream (#4038)

The HTTP/2 error code will be INTERNAL_ERROR for all cancel statuses,
except for DEADLINE_EXCEEDED and CANCELLED, which are mapped to
CANCELLED.
This commit is contained in:
Rama Chavali 2018-02-23 00:39:21 +05:30 committed by zpencer
parent 887217e57b
commit 48ca4527c1
6 changed files with 26 additions and 17 deletions

View File

@ -37,6 +37,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.ServerCall; import io.grpc.ServerCall;
import io.grpc.Status; import io.grpc.Status;
import java.io.InputStream; import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> { final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@ -205,7 +206,8 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
* on. * on.
*/ */
private void internalClose(Status internalError) { private void internalClose(Status internalError) {
stream.close(internalError, new Metadata()); log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
stream.cancel(internalError);
} }
/** /**

View File

@ -629,8 +629,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/ */
private void internalClose() { private void internalClose() {
// TODO(ejona86): this is not thread-safe :) stream.cancel(Status.INTERNAL);
stream.close(Status.UNKNOWN, new Metadata());
} }
@Override @Override

View File

@ -192,10 +192,9 @@ public class ServerCallImplTest {
verify(stream, times(1)).writeMessage(any(InputStream.class)); verify(stream, times(1)).writeMessage(any(InputStream.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture()); verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription()); assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
} }
@Test @Test
@ -221,7 +220,7 @@ public class ServerCallImplTest {
serverCall.sendMessage(1L); serverCall.sendMessage(1L);
serverCall.sendMessage(1L); serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class)); verify(stream, times(1)).writeMessage(any(InputStream.class));
verify(stream, times(1)).close(any(Status.class), any(Metadata.class)); verify(stream, times(1)).cancel(any(Status.class));
// App runs to completion but everything is ignored // App runs to completion but everything is ignored
serverCall.sendMessage(1L); serverCall.sendMessage(1L);
@ -255,11 +254,9 @@ public class ServerCallImplTest {
CompressorRegistry.getDefaultInstance()); CompressorRegistry.getDefaultInstance());
serverCall.close(Status.OK, new Metadata()); serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); verify(stream, times(1)).cancel(statusCaptor.capture());
verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription()); assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
} }
@Test @Test

View File

@ -1108,7 +1108,7 @@ public class ServerImplTest {
fail("Expected exception"); fail("Expected exception");
} catch (TestError t) { } catch (TestError t) {
assertSame(expectedT, t); assertSame(expectedT, t);
ensureServerStateNotLeaked(); ensureServerStateIsCancelled();
} }
} }
@ -1133,7 +1133,7 @@ public class ServerImplTest {
fail("Expected exception"); fail("Expected exception");
} catch (RuntimeException t) { } catch (RuntimeException t) {
assertSame(expectedT, t); assertSame(expectedT, t);
ensureServerStateNotLeaked(); ensureServerStateIsCancelled();
} }
} }
@ -1156,7 +1156,7 @@ public class ServerImplTest {
fail("Expected exception"); fail("Expected exception");
} catch (TestError t) { } catch (TestError t) {
assertSame(expectedT, t); assertSame(expectedT, t);
ensureServerStateNotLeaked(); ensureServerStateIsCancelled();
} }
} }
@ -1179,7 +1179,7 @@ public class ServerImplTest {
fail("Expected exception"); fail("Expected exception");
} catch (RuntimeException t) { } catch (RuntimeException t) {
assertSame(expectedT, t); assertSame(expectedT, t);
ensureServerStateNotLeaked(); ensureServerStateIsCancelled();
} }
} }
@ -1202,7 +1202,7 @@ public class ServerImplTest {
fail("Expected exception"); fail("Expected exception");
} catch (TestError t) { } catch (TestError t) {
assertSame(expectedT, t); assertSame(expectedT, t);
ensureServerStateNotLeaked(); ensureServerStateIsCancelled();
} }
} }
@ -1225,7 +1225,7 @@ public class ServerImplTest {
fail("Expected exception"); fail("Expected exception");
} catch (RuntimeException t) { } catch (RuntimeException t) {
assertSame(expectedT, t); assertSame(expectedT, t);
ensureServerStateNotLeaked(); ensureServerStateIsCancelled();
} }
} }
@ -1396,6 +1396,12 @@ public class ServerImplTest {
assertTrue(metadataCaptor.getValue().keys().isEmpty()); assertTrue(metadataCaptor.getValue().keys().isEmpty());
} }
private void ensureServerStateIsCancelled() {
verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.INTERNAL, statusCaptor.getValue());
assertNull(statusCaptor.getValue().getCause());
}
private static class SimpleServer implements io.grpc.internal.InternalServer { private static class SimpleServer implements io.grpc.internal.InternalServer {
ServerListener listener; ServerListener listener;

View File

@ -247,7 +247,7 @@ public class MoreInProcessTest {
.onNext(StreamingInputCallRequest.getDefaultInstance()); .onNext(StreamingInputCallRequest.getDefaultInstance());
assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS));
assertEquals(Status.UNKNOWN, Status.fromThrowable(throwableRef.get())); assertEquals(Status.CANCELLED.getCode(), Status.fromThrowable(throwableRef.get()).getCode());
assertNull(responseRef.get()); assertNull(responseRef.get());
} }
} }

View File

@ -654,8 +654,13 @@ class NettyServerHandler extends AbstractNettyHandler {
ChannelPromise promise) { ChannelPromise promise) {
// Notify the listener if we haven't already. // Notify the listener if we haven't already.
cmd.stream().transportReportStatus(cmd.reason()); cmd.stream().transportReportStatus(cmd.reason());
Http2Error http2Error = Http2Error.INTERNAL_ERROR;
if (Status.DEADLINE_EXCEEDED.getCode().equals(cmd.reason().getCode()) || Status.CANCELLED
.getCode().equals(cmd.reason().getCode())) {
http2Error = Http2Error.CANCEL;
}
// Terminate the stream. // Terminate the stream.
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); encoder().writeRstStream(ctx, cmd.stream().id(), http2Error.code(), promise);
} }
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,