mirror of https://github.com/grpc/grpc-java.git
Handle header with errors and endStream = true (#10384)
* Eliminate NPE by skipping further processing when stream is defined, but doesn't have a property for streamKey (header processing identified an error) Fixes #10364 * Add unit test for missing content type
This commit is contained in:
parent
56d1c42c80
commit
864d30caed
|
|
@ -2106,6 +2106,9 @@ public abstract class AbstractTransportTest {
|
||||||
* be present, and the cause should be stripped away.
|
* be present, and the cause should be stripped away.
|
||||||
*/
|
*/
|
||||||
private static void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
|
private static void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
|
||||||
|
if (!clientStreamStatus.isOk() && clientStreamStatus.getCode() != expectedStatus.getCode()) {
|
||||||
|
System.out.println("Full Status: " + clientStreamStatus);
|
||||||
|
}
|
||||||
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
|
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
|
||||||
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
|
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
|
||||||
assertNull(clientStreamStatus.getCause());
|
assertNull(clientStreamStatus.getCause());
|
||||||
|
|
|
||||||
|
|
@ -512,6 +512,9 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
flowControlPing().onDataRead(data.readableBytes(), padding);
|
flowControlPing().onDataRead(data.readableBytes(), padding);
|
||||||
try {
|
try {
|
||||||
NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
|
NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
|
||||||
|
if (stream == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
|
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
|
||||||
PerfMark.attachTag(stream.tag());
|
PerfMark.attachTag(stream.tag());
|
||||||
stream.inboundDataReceived(data, endOfStream);
|
stream.inboundDataReceived(data, endOfStream);
|
||||||
|
|
@ -679,12 +682,14 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
|
|
||||||
private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
|
private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
|
||||||
final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
|
final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
|
||||||
promise.addListener(new ChannelFutureListener() {
|
if (stream != null) {
|
||||||
@Override
|
promise.addListener(new ChannelFutureListener() {
|
||||||
public void operationComplete(ChannelFuture future) {
|
@Override
|
||||||
stream.complete();
|
public void operationComplete(ChannelFuture future) {
|
||||||
}
|
stream.complete();
|
||||||
});
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -634,6 +634,34 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
|
||||||
any(ChannelPromise.class));
|
any(ChannelPromise.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void headersWithErrAndEndStreamReturnErrorButNotThrowNpe() throws Exception {
|
||||||
|
manualSetUp();
|
||||||
|
Http2Headers headers = new DefaultHttp2Headers()
|
||||||
|
.method(HTTP_METHOD)
|
||||||
|
.add(AsciiString.of("host"), AsciiString.of("example.com"))
|
||||||
|
.path(new AsciiString("/foo/bar"));
|
||||||
|
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
|
||||||
|
channelRead(headersFrame);
|
||||||
|
channelRead(emptyGrpcFrame(STREAM_ID, true));
|
||||||
|
|
||||||
|
Http2Headers responseHeaders = new DefaultHttp2Headers()
|
||||||
|
.set(InternalStatus.CODE_KEY.name(), String.valueOf(Code.INTERNAL.value()))
|
||||||
|
.set(InternalStatus.MESSAGE_KEY.name(), "Content-Type is missing from the request")
|
||||||
|
.status("" + 415)
|
||||||
|
.set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
|
||||||
|
|
||||||
|
verifyWrite()
|
||||||
|
.writeHeaders(
|
||||||
|
eq(ctx()),
|
||||||
|
eq(STREAM_ID),
|
||||||
|
eq(responseHeaders),
|
||||||
|
eq(0),
|
||||||
|
eq(false),
|
||||||
|
any(ChannelPromise.class));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void headersWithAuthorityAndHostUsesAuthority() throws Exception {
|
public void headersWithAuthorityAndHostUsesAuthority() throws Exception {
|
||||||
manualSetUp();
|
manualSetUp();
|
||||||
|
|
|
||||||
|
|
@ -943,7 +943,7 @@ public class WeightedRoundRobinLoadBalancerTest {
|
||||||
Map<Integer, Integer> pickCount = new HashMap<>();
|
Map<Integer, Integer> pickCount = new HashMap<>();
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
int result = sss.pick();
|
int result = sss.pick();
|
||||||
pickCount.put(result, pickCount.getOrDefault(result, 0) + 1);
|
pickCount.merge(result, 1, (o, v) -> o + v);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
assertThat(Math.abs(pickCount.getOrDefault(i, 0) / 1000.0 - weights[i] / totalWeight))
|
assertThat(Math.abs(pickCount.getOrDefault(i, 0) / 1000.0 - weights[i] / totalWeight))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue