From dde1e809a76fac40a4feb844143fd3da658e6181 Mon Sep 17 00:00:00 2001 From: Xudong Ma Date: Mon, 27 Jul 2015 16:51:37 -0700 Subject: [PATCH] Adjust @GuardedBy to pass internal GuardedBy Checking. This is required by our internal sync. --- .../transport/okhttp/OkHttpClientStream.java | 2 + .../okhttp/OkHttpClientTransport.java | 6 -- .../okhttp/OkHttpClientTransportTest.java | 88 ++++++++++--------- 3 files changed, 47 insertions(+), 49 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java index fa042944dd..6ad25a19ec 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java @@ -143,6 +143,7 @@ class OkHttpClientStream extends Http2ClientStream { /** * Must be called with holding the transport lock. */ + @GuardedBy("lock") public void transportHeadersReceived(List
headers, boolean endOfStream) { if (endOfStream) { transportTrailersReceived(Utils.convertTrailers(headers)); @@ -154,6 +155,7 @@ class OkHttpClientStream extends Http2ClientStream { /** * Must be called with holding the transport lock. */ + @GuardedBy("lock") public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { long length = frame.size(); window -= length; diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java index 5bf40768e9..552144a01e 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java @@ -413,17 +413,11 @@ class OkHttpClientTransport implements ClientTransport { } } - @VisibleForTesting ClientFrameHandler getHandler() { return clientFrameHandler; } - @VisibleForTesting - Map getStreams() { - return streams; - } - @VisibleForTesting int getPendingStreamSize() { synchronized (lock) { diff --git a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java index 58a8b41d30..414dca4af1 100644 --- a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java @@ -101,7 +101,6 @@ import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -128,7 +127,6 @@ public class OkHttpClientTransportTest { private ClientTransport.Listener transportListener; private OkHttpClientTransport clientTransport; private MockFrameReader frameReader; - private Map streams; private ClientFrameHandler frameHandler; private ExecutorService executor; private long nanoTime; // backs a ticker, for testing ping round-trip time measurement @@ -161,14 +159,13 @@ public class OkHttpClientTransportTest { executor, frameReader, frameWriter, startId, new MockSocket(frameReader), ticker, connectedCallback); clientTransport.start(transportListener); - streams = clientTransport.getStreams(); } /** Final test checks and clean up. */ @After public void tearDown() throws Exception { clientTransport.shutdown(); - assertEquals(0, streams.size()); + assertEquals(0, activeStreamCount()); verify(frameWriter, timeout(TIME_OUT_MS)).close(); frameReader.assertClosed(); executor.shutdown(); @@ -184,13 +181,13 @@ public class OkHttpClientTransportTest { MockStreamListener listener2 = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener1).request(1); clientTransport.newStream(method, new Metadata.Headers(), listener2).request(1); - assertEquals(2, streams.size()); - assertTrue(streams.containsKey(3)); - assertTrue(streams.containsKey(5)); + assertEquals(2, activeStreamCount()); + assertContainStream(3); + assertContainStream(5); frameReader.throwIoExceptionForNextFrame(); listener1.waitUntilStreamClosed(); listener2.waitUntilStreamClosed(); - assertEquals(0, streams.size()); + assertEquals(0, activeStreamCount()); assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode()); assertEquals("Protocol error\n" + NETWORK_ISSUE_MESSAGE, listener1.status.getDescription()); assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode()); @@ -206,7 +203,7 @@ public class OkHttpClientTransportTest { final String message = "Hello Client"; MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener).request(numMessages); - assertTrue(streams.containsKey(3)); + assertContainStream(3); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); assertNotNull(listener.headers); for (int i = 0; i < numMessages; i++) { @@ -250,7 +247,7 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener).request(1); - assertTrue(streams.containsKey(3)); + assertContainStream(3); // Empty headers block without correct content type or status frameHandler().headers(false, false, 3, 0, new ArrayList
(), HeadersMode.HTTP_20_HEADERS); @@ -268,7 +265,7 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener); - assertTrue(streams.containsKey(3)); + assertContainStream(3); frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); listener.waitUntilStreamClosed(); assertEquals(Status.Code.OK, listener.status.getCode()); @@ -279,7 +276,7 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener); - assertTrue(streams.containsKey(3)); + assertContainStream(3); frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR); listener.waitUntilStreamClosed(); assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.PROTOCOL_ERROR), listener.status); @@ -290,9 +287,7 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener); - OkHttpClientStream stream = streams.get(3); - assertNotNull(stream); - stream.cancel(Status.CANCELLED); + getStream(3).cancel(Status.CANCELLED); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); listener.waitUntilStreamClosed(); assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), @@ -312,7 +307,7 @@ public class OkHttpClientTransportTest { userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER); verify(frameWriter, timeout(TIME_OUT_MS)) .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); - streams.get(3).cancel(Status.CANCELLED); + getStream(3).cancel(Status.CANCELLED); } @Test @@ -331,7 +326,7 @@ public class OkHttpClientTransportTest { CONTENT_TYPE_HEADER, TE_HEADER); verify(frameWriter, timeout(TIME_OUT_MS)) .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); - streams.get(3).cancel(Status.CANCELLED); + getStream(3).cancel(Status.CANCELLED); } @Test @@ -339,9 +334,7 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener); - OkHttpClientStream stream = streams.get(3); - assertNotNull(stream); - stream.cancel(Status.DEADLINE_EXCEEDED); + getStream(3).cancel(Status.DEADLINE_EXCEEDED); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); listener.waitUntilStreamClosed(); } @@ -351,8 +344,8 @@ public class OkHttpClientTransportTest { initTransport(); final String message = "Hello Server"; MockStreamListener listener = new MockStreamListener(); - clientTransport.newStream(method, new Metadata.Headers(), listener); - OkHttpClientStream stream = streams.get(3); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata.Headers(), listener); InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); assertEquals(12, input.available()); stream.writeMessage(input); @@ -372,9 +365,9 @@ public class OkHttpClientTransportTest { MockStreamListener listener2 = new MockStreamListener(); clientTransport.newStream(method,new Metadata.Headers(), listener1).request(2); clientTransport.newStream(method,new Metadata.Headers(), listener2).request(2); - assertEquals(2, streams.size()); - OkHttpClientStream stream1 = streams.get(3); - OkHttpClientStream stream2 = streams.get(5); + assertEquals(2, activeStreamCount()); + OkHttpClientStream stream1 = getStream(3); + OkHttpClientStream stream2 = getStream(5); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); @@ -429,7 +422,6 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); clientTransport.newStream(method, new Metadata.Headers(), listener).request(1); - OkHttpClientStream stream = streams.get(3); int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1; byte[] fakeMessage = new byte[messageLength]; @@ -442,7 +434,7 @@ public class OkHttpClientTransportTest { // We return the bytes for the stream window as we read the message. verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength)); - stream.cancel(Status.CANCELLED); + getStream(3).cancel(Status.CANCELLED); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); listener.waitUntilStreamClosed(); assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), @@ -535,17 +527,17 @@ public class OkHttpClientTransportTest { = clientTransport.newStream(method, new Metadata.Headers(), listener1); OkHttpClientStream stream2 = clientTransport.newStream(method, new Metadata.Headers(), listener2); - assertEquals(2, streams.size()); + assertEquals(2, activeStreamCount()); clientTransport.shutdown(); verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); - assertEquals(2, streams.size()); + assertEquals(2, activeStreamCount()); verify(transportListener).transportShutdown(); stream1.cancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED); listener1.waitUntilStreamClosed(); listener2.waitUntilStreamClosed(); - assertEquals(0, streams.size()); + assertEquals(0, activeStreamCount()); assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); @@ -559,7 +551,7 @@ public class OkHttpClientTransportTest { MockStreamListener listener2 = new MockStreamListener(); clientTransport.newStream(method,new Metadata.Headers(), listener1).request(1); clientTransport.newStream(method,new Metadata.Headers(), listener2).request(1); - assertEquals(2, streams.size()); + assertEquals(2, activeStreamCount()); // Receive goAway, max good id is 3. frameHandler().goAway(3, ErrorCode.CANCEL, null); @@ -570,7 +562,7 @@ public class OkHttpClientTransportTest { // Stream 2 should be closed. listener2.waitUntilStreamClosed(); - assertEquals(1, streams.size()); + assertEquals(1, activeStreamCount()); assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); // New stream should be failed. @@ -578,9 +570,8 @@ public class OkHttpClientTransportTest { // But stream 1 should be able to send. final String sentMessage = "Should I also go away?"; - OkHttpClientStream stream = streams.get(3); - InputStream input = - new ByteArrayInputStream(sentMessage.getBytes(UTF_8)); + OkHttpClientStream stream = getStream(3); + InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8)); assertEquals(22, input.available()); stream.writeMessage(input); stream.flush(); @@ -615,7 +606,7 @@ public class OkHttpClientTransportTest { assertNewStreamFail(); - streams.get(startId).cancel(Status.CANCELLED); + getStream(startId).cancel(Status.CANCELLED); listener1.waitUntilStreamClosed(); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL)); verify(transportListener).transportShutdown(); @@ -641,16 +632,16 @@ public class OkHttpClientTransportTest { } }).start(); waitForStreamPending(1); - assertEquals(1, streams.size()); + assertEquals(1, activeStreamCount()); assertEquals(3, (int) stream1.id()); // Finish the first stream stream1.cancel(Status.CANCELLED); assertTrue("newStream() call is still blocking", newStreamReturn.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)); - assertEquals(1, streams.size()); + assertEquals(1, activeStreamCount()); assertEquals(0, clientTransport.getPendingStreamSize()); - OkHttpClientStream stream2 = streams.get(5); + OkHttpClientStream stream2 = getStream(5); assertNotNull(stream2); stream2.cancel(Status.CANCELLED); } @@ -739,7 +730,7 @@ public class OkHttpClientTransportTest { }).start(); waitForStreamPending(2); - assertEquals(1, streams.size()); + assertEquals(1, activeStreamCount()); assertEquals(startId, (int) stream1.id()); // Now finish stream1, stream2 should be started and exhaust the id, @@ -753,8 +744,8 @@ public class OkHttpClientTransportTest { listener3.waitUntilStreamClosed(); assertEquals(Status.INTERNAL.getCode(), listener3.status.getCode()); assertEquals(0, clientTransport.getPendingStreamSize()); - assertEquals(1, streams.size()); - OkHttpClientStream stream2 = streams.get(startId + 2); + assertEquals(1, activeStreamCount()); + OkHttpClientStream stream2 = getStream(startId + 2); assertNotNull(stream2); stream2.cancel(Status.CANCELLED); } @@ -917,7 +908,7 @@ public class OkHttpClientTransportTest { initTransport(); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream( - method,new Metadata.Headers(), listener); + method, new Metadata.Headers(), listener); assertTrue(stream.isReady()); assertTrue(listener.isOnReadyCalled()); stream.cancel(Status.CANCELLED); @@ -1129,6 +1120,17 @@ public class OkHttpClientTransportTest { stream.cancel(Status.CANCELLED); } + private int activeStreamCount() { + return clientTransport.getActiveStreams().length; + } + + private OkHttpClientStream getStream(int streamId) { + return clientTransport.getStream(streamId); + } + + void assertContainStream(int streamId) { + assertNotNull(clientTransport.getStream(streamId)); + } private ClientFrameHandler frameHandler() throws Exception { if (frameHandler == null) {