mirror of https://github.com/grpc/grpc-java.git
netty: improve server handling of writes to reset streams (#10258)
* netty: improve server handling of writes to reset streams A server stream can be reset by the client while server writes are still queued. After the stream is reset, the netty connection will forget the stream object. The `NettyServerHandler` must deal with that situation. `sendResponseHandlers` already had some code to do that. This change standardizes that code and adds it to `sendGrpcFrame`. This fixes a potential bug where a `SendGrpcFrameCommand` with `endOfStream=true` would raise an `AssertionError` if written to a reset stream. (This bug is not currently reachable because `endOfStream=false` for all server `SendGrpcFrameCommand` objects.) * Do not call into the encoder when we know the stream is gone.
This commit is contained in:
parent
2f025b2b24
commit
a68399a9b6
|
|
@ -678,9 +678,7 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
return serverWriteQueue;
|
return serverWriteQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Handler for commands sent from the stream. */
|
||||||
* Handler for commands sent from the stream.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
@ -720,31 +718,45 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
|
private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
|
||||||
final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
|
promise.addListener(
|
||||||
if (stream != null) {
|
new ChannelFutureListener() {
|
||||||
promise.addListener(new ChannelFutureListener() {
|
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) {
|
public void operationComplete(ChannelFuture future) {
|
||||||
stream.complete();
|
serverStream(stream).complete();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void streamGone(int streamId, ChannelPromise promise) {
|
||||||
|
promise.setFailure(
|
||||||
|
new IllegalStateException(
|
||||||
|
"attempting to write to stream " + streamId + " that no longer exists") {
|
||||||
|
@Override
|
||||||
|
public synchronized Throwable fillInStackTrace() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Sends the given gRPC frame to the client. */
|
||||||
* Sends the given gRPC frame to the client.
|
private void sendGrpcFrame(
|
||||||
*/
|
ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
|
||||||
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
|
throws Http2Exception {
|
||||||
ChannelPromise promise) throws Http2Exception {
|
|
||||||
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
|
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
|
||||||
PerfMark.attachTag(cmd.stream().tag());
|
PerfMark.attachTag(cmd.stream().tag());
|
||||||
PerfMark.linkIn(cmd.getLink());
|
PerfMark.linkIn(cmd.getLink());
|
||||||
|
int streamId = cmd.stream().id();
|
||||||
|
Http2Stream stream = connection().stream(streamId);
|
||||||
|
if (stream == null) {
|
||||||
|
streamGone(streamId, promise);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (cmd.endStream()) {
|
if (cmd.endStream()) {
|
||||||
closeStreamWhenDone(promise, cmd.stream().id());
|
closeStreamWhenDone(promise, stream);
|
||||||
}
|
}
|
||||||
// Call the base class to write the HTTP/2 DATA frame.
|
// Call the base class to write the HTTP/2 DATA frame.
|
||||||
encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
|
encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -756,16 +768,14 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
|
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
|
||||||
PerfMark.attachTag(cmd.stream().tag());
|
PerfMark.attachTag(cmd.stream().tag());
|
||||||
PerfMark.linkIn(cmd.getLink());
|
PerfMark.linkIn(cmd.getLink());
|
||||||
// TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
|
|
||||||
// is fixed.
|
|
||||||
int streamId = cmd.stream().id();
|
int streamId = cmd.stream().id();
|
||||||
Http2Stream stream = connection().stream(streamId);
|
Http2Stream stream = connection().stream(streamId);
|
||||||
if (stream == null) {
|
if (stream == null) {
|
||||||
resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
|
streamGone(streamId, promise);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (cmd.endOfStream()) {
|
if (cmd.endOfStream()) {
|
||||||
closeStreamWhenDone(promise, streamId);
|
closeStreamWhenDone(promise, stream);
|
||||||
}
|
}
|
||||||
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
|
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue