From b7cf75fac1991121803f469a41784154b5058e20 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 30 Jan 2019 11:01:30 -0800 Subject: [PATCH] cronet: avoid NPE in writeHeaders after transport shutdown (#5275) --- .../io/grpc/cronet/CronetClientStream.java | 9 ++++++ .../cronet/CronetClientTransportTest.java | 32 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java index 1217106ec9..4edee96d0f 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -131,6 +131,10 @@ class CronetClientStream extends AbstractClientStream { @Override public void writeHeaders(Metadata metadata, byte[] payload) { startCallback.run(); + // streamFactory will be set by startCallback, unless the transport is in go-away state + if (streamFactory == null) { + return; + } BidirectionalStreamCallback callback = new BidirectionalStreamCallback(); String path = url; @@ -247,6 +251,7 @@ class CronetClientStream extends AbstractClientStream { @GuardedBy("lock") @Override protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) { + Preconditions.checkNotNull(stream, "stream must not be null"); stream.cancel(); transportReportStatus(status, stopDelivery, trailers); } @@ -267,6 +272,7 @@ class CronetClientStream extends AbstractClientStream { @GuardedBy("lock") @Override public void bytesRead(int processedBytes) { + Preconditions.checkNotNull(stream, "stream must not be null"); bytesPendingProcess -= processedBytes; if (bytesPendingProcess == 0 && !readClosed) { if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { @@ -345,6 +351,9 @@ class CronetClientStream extends AbstractClientStream { } private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) { + if (stream == null) { + return; + } if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { Log.v(LOG_TAG, "BidirectionalStream.write"); } diff --git a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java index c6bc9b0439..980fd99155 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java @@ -121,4 +121,36 @@ public final class CronetClientTransportTest { // All streams are gone now. verify(clientTransportListener, times(1)).transportTerminated(); } + + @Test + public void startStreamAfterShutdown() throws Exception { + CronetClientStream stream = + transport.newStream(descriptor, new Metadata(), CallOptions.DEFAULT); + transport.shutdown(); + BaseClientStreamListener listener = new BaseClientStreamListener(); + stream.start(listener); + + assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); + } + + private static class BaseClientStreamListener implements ClientStreamListener { + private Status status; + + @Override + public void messagesAvailable(MessageProducer producer) {} + + @Override + public void onReady() {} + + @Override + public void headersRead(Metadata headers) {} + + @Override + public void closed(Status status, Metadata trailers) {} + + @Override + public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { + this.status = status; + } + } }