Adjust @GuardedBy to pass internal GuardedBy Checking.

This is required by our internal sync.
This commit is contained in:
Xudong Ma 2015-07-27 16:51:37 -07:00
parent 4d2b3e3d06
commit dde1e809a7
3 changed files with 47 additions and 49 deletions

View File

@ -143,6 +143,7 @@ class OkHttpClientStream extends Http2ClientStream {
/** /**
* Must be called with holding the transport lock. * Must be called with holding the transport lock.
*/ */
@GuardedBy("lock")
public void transportHeadersReceived(List<Header> headers, boolean endOfStream) { public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
if (endOfStream) { if (endOfStream) {
transportTrailersReceived(Utils.convertTrailers(headers)); transportTrailersReceived(Utils.convertTrailers(headers));
@ -154,6 +155,7 @@ class OkHttpClientStream extends Http2ClientStream {
/** /**
* Must be called with holding the transport lock. * Must be called with holding the transport lock.
*/ */
@GuardedBy("lock")
public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
long length = frame.size(); long length = frame.size();
window -= length; window -= length;

View File

@ -413,17 +413,11 @@ class OkHttpClientTransport implements ClientTransport {
} }
} }
@VisibleForTesting @VisibleForTesting
ClientFrameHandler getHandler() { ClientFrameHandler getHandler() {
return clientFrameHandler; return clientFrameHandler;
} }
@VisibleForTesting
Map<Integer, OkHttpClientStream> getStreams() {
return streams;
}
@VisibleForTesting @VisibleForTesting
int getPendingStreamSize() { int getPendingStreamSize() {
synchronized (lock) { synchronized (lock) {

View File

@ -101,7 +101,6 @@ import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -128,7 +127,6 @@ public class OkHttpClientTransportTest {
private ClientTransport.Listener transportListener; private ClientTransport.Listener transportListener;
private OkHttpClientTransport clientTransport; private OkHttpClientTransport clientTransport;
private MockFrameReader frameReader; private MockFrameReader frameReader;
private Map<Integer, OkHttpClientStream> streams;
private ClientFrameHandler frameHandler; private ClientFrameHandler frameHandler;
private ExecutorService executor; private ExecutorService executor;
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
@ -161,14 +159,13 @@ public class OkHttpClientTransportTest {
executor, frameReader, frameWriter, startId, executor, frameReader, frameWriter, startId,
new MockSocket(frameReader), ticker, connectedCallback); new MockSocket(frameReader), ticker, connectedCallback);
clientTransport.start(transportListener); clientTransport.start(transportListener);
streams = clientTransport.getStreams();
} }
/** Final test checks and clean up. */ /** Final test checks and clean up. */
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
clientTransport.shutdown(); clientTransport.shutdown();
assertEquals(0, streams.size()); assertEquals(0, activeStreamCount());
verify(frameWriter, timeout(TIME_OUT_MS)).close(); verify(frameWriter, timeout(TIME_OUT_MS)).close();
frameReader.assertClosed(); frameReader.assertClosed();
executor.shutdown(); executor.shutdown();
@ -184,13 +181,13 @@ public class OkHttpClientTransportTest {
MockStreamListener listener2 = new MockStreamListener(); MockStreamListener listener2 = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener1).request(1); clientTransport.newStream(method, new Metadata.Headers(), listener1).request(1);
clientTransport.newStream(method, new Metadata.Headers(), listener2).request(1); clientTransport.newStream(method, new Metadata.Headers(), listener2).request(1);
assertEquals(2, streams.size()); assertEquals(2, activeStreamCount());
assertTrue(streams.containsKey(3)); assertContainStream(3);
assertTrue(streams.containsKey(5)); assertContainStream(5);
frameReader.throwIoExceptionForNextFrame(); frameReader.throwIoExceptionForNextFrame();
listener1.waitUntilStreamClosed(); listener1.waitUntilStreamClosed();
listener2.waitUntilStreamClosed(); listener2.waitUntilStreamClosed();
assertEquals(0, streams.size()); assertEquals(0, activeStreamCount());
assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode());
assertEquals("Protocol error\n" + NETWORK_ISSUE_MESSAGE, listener1.status.getDescription()); assertEquals("Protocol error\n" + NETWORK_ISSUE_MESSAGE, listener1.status.getDescription());
assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode());
@ -206,7 +203,7 @@ public class OkHttpClientTransportTest {
final String message = "Hello Client"; final String message = "Hello Client";
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener).request(numMessages); clientTransport.newStream(method, new Metadata.Headers(), listener).request(numMessages);
assertTrue(streams.containsKey(3)); assertContainStream(3);
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
assertNotNull(listener.headers); assertNotNull(listener.headers);
for (int i = 0; i < numMessages; i++) { for (int i = 0; i < numMessages; i++) {
@ -250,7 +247,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1); clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
assertTrue(streams.containsKey(3)); assertContainStream(3);
// Empty headers block without correct content type or status // Empty headers block without correct content type or status
frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(), frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(),
HeadersMode.HTTP_20_HEADERS); HeadersMode.HTTP_20_HEADERS);
@ -268,7 +265,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener); clientTransport.newStream(method, new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3)); assertContainStream(3);
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(Status.Code.OK, listener.status.getCode()); assertEquals(Status.Code.OK, listener.status.getCode());
@ -279,7 +276,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener); clientTransport.newStream(method, new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3)); assertContainStream(3);
frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR); frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR);
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.PROTOCOL_ERROR), listener.status); assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.PROTOCOL_ERROR), listener.status);
@ -290,9 +287,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener); clientTransport.newStream(method, new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3); getStream(3).cancel(Status.CANCELLED);
assertNotNull(stream);
stream.cancel(Status.CANCELLED);
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
@ -312,7 +307,7 @@ public class OkHttpClientTransportTest {
userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER); userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER);
verify(frameWriter, timeout(TIME_OUT_MS)) verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
streams.get(3).cancel(Status.CANCELLED); getStream(3).cancel(Status.CANCELLED);
} }
@Test @Test
@ -331,7 +326,7 @@ public class OkHttpClientTransportTest {
CONTENT_TYPE_HEADER, TE_HEADER); CONTENT_TYPE_HEADER, TE_HEADER);
verify(frameWriter, timeout(TIME_OUT_MS)) verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
streams.get(3).cancel(Status.CANCELLED); getStream(3).cancel(Status.CANCELLED);
} }
@Test @Test
@ -339,9 +334,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener); clientTransport.newStream(method, new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3); getStream(3).cancel(Status.DEADLINE_EXCEEDED);
assertNotNull(stream);
stream.cancel(Status.DEADLINE_EXCEEDED);
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
} }
@ -351,8 +344,8 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
final String message = "Hello Server"; final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata.Headers(), listener); clientTransport.newStream(method, new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3);
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
assertEquals(12, input.available()); assertEquals(12, input.available());
stream.writeMessage(input); stream.writeMessage(input);
@ -372,9 +365,9 @@ public class OkHttpClientTransportTest {
MockStreamListener listener2 = new MockStreamListener(); MockStreamListener listener2 = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener1).request(2); clientTransport.newStream(method,new Metadata.Headers(), listener1).request(2);
clientTransport.newStream(method,new Metadata.Headers(), listener2).request(2); clientTransport.newStream(method,new Metadata.Headers(), listener2).request(2);
assertEquals(2, streams.size()); assertEquals(2, activeStreamCount());
OkHttpClientStream stream1 = streams.get(3); OkHttpClientStream stream1 = getStream(3);
OkHttpClientStream stream2 = streams.get(5); OkHttpClientStream stream2 = getStream(5);
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
@ -429,7 +422,6 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1); clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
OkHttpClientStream stream = streams.get(3);
int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1; int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1;
byte[] fakeMessage = new byte[messageLength]; byte[] fakeMessage = new byte[messageLength];
@ -442,7 +434,7 @@ public class OkHttpClientTransportTest {
// We return the bytes for the stream window as we read the message. // We return the bytes for the stream window as we read the message.
verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength)); verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength));
stream.cancel(Status.CANCELLED); getStream(3).cancel(Status.CANCELLED);
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
@ -535,17 +527,17 @@ public class OkHttpClientTransportTest {
= clientTransport.newStream(method, new Metadata.Headers(), listener1); = clientTransport.newStream(method, new Metadata.Headers(), listener1);
OkHttpClientStream stream2 OkHttpClientStream stream2
= clientTransport.newStream(method, new Metadata.Headers(), listener2); = clientTransport.newStream(method, new Metadata.Headers(), listener2);
assertEquals(2, streams.size()); assertEquals(2, activeStreamCount());
clientTransport.shutdown(); clientTransport.shutdown();
verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
assertEquals(2, streams.size()); assertEquals(2, activeStreamCount());
verify(transportListener).transportShutdown(); verify(transportListener).transportShutdown();
stream1.cancel(Status.CANCELLED); stream1.cancel(Status.CANCELLED);
stream2.cancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED);
listener1.waitUntilStreamClosed(); listener1.waitUntilStreamClosed();
listener2.waitUntilStreamClosed(); listener2.waitUntilStreamClosed();
assertEquals(0, streams.size()); assertEquals(0, activeStreamCount());
assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode());
assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
@ -559,7 +551,7 @@ public class OkHttpClientTransportTest {
MockStreamListener listener2 = new MockStreamListener(); MockStreamListener listener2 = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener1).request(1); clientTransport.newStream(method,new Metadata.Headers(), listener1).request(1);
clientTransport.newStream(method,new Metadata.Headers(), listener2).request(1); clientTransport.newStream(method,new Metadata.Headers(), listener2).request(1);
assertEquals(2, streams.size()); assertEquals(2, activeStreamCount());
// Receive goAway, max good id is 3. // Receive goAway, max good id is 3.
frameHandler().goAway(3, ErrorCode.CANCEL, null); frameHandler().goAway(3, ErrorCode.CANCEL, null);
@ -570,7 +562,7 @@ public class OkHttpClientTransportTest {
// Stream 2 should be closed. // Stream 2 should be closed.
listener2.waitUntilStreamClosed(); listener2.waitUntilStreamClosed();
assertEquals(1, streams.size()); assertEquals(1, activeStreamCount());
assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
// New stream should be failed. // New stream should be failed.
@ -578,9 +570,8 @@ public class OkHttpClientTransportTest {
// But stream 1 should be able to send. // But stream 1 should be able to send.
final String sentMessage = "Should I also go away?"; final String sentMessage = "Should I also go away?";
OkHttpClientStream stream = streams.get(3); OkHttpClientStream stream = getStream(3);
InputStream input = InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
assertEquals(22, input.available()); assertEquals(22, input.available());
stream.writeMessage(input); stream.writeMessage(input);
stream.flush(); stream.flush();
@ -615,7 +606,7 @@ public class OkHttpClientTransportTest {
assertNewStreamFail(); assertNewStreamFail();
streams.get(startId).cancel(Status.CANCELLED); getStream(startId).cancel(Status.CANCELLED);
listener1.waitUntilStreamClosed(); listener1.waitUntilStreamClosed();
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL)); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL));
verify(transportListener).transportShutdown(); verify(transportListener).transportShutdown();
@ -641,16 +632,16 @@ public class OkHttpClientTransportTest {
} }
}).start(); }).start();
waitForStreamPending(1); waitForStreamPending(1);
assertEquals(1, streams.size()); assertEquals(1, activeStreamCount());
assertEquals(3, (int) stream1.id()); assertEquals(3, (int) stream1.id());
// Finish the first stream // Finish the first stream
stream1.cancel(Status.CANCELLED); stream1.cancel(Status.CANCELLED);
assertTrue("newStream() call is still blocking", assertTrue("newStream() call is still blocking",
newStreamReturn.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)); newStreamReturn.await(TIME_OUT_MS, TimeUnit.MILLISECONDS));
assertEquals(1, streams.size()); assertEquals(1, activeStreamCount());
assertEquals(0, clientTransport.getPendingStreamSize()); assertEquals(0, clientTransport.getPendingStreamSize());
OkHttpClientStream stream2 = streams.get(5); OkHttpClientStream stream2 = getStream(5);
assertNotNull(stream2); assertNotNull(stream2);
stream2.cancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED);
} }
@ -739,7 +730,7 @@ public class OkHttpClientTransportTest {
}).start(); }).start();
waitForStreamPending(2); waitForStreamPending(2);
assertEquals(1, streams.size()); assertEquals(1, activeStreamCount());
assertEquals(startId, (int) stream1.id()); assertEquals(startId, (int) stream1.id());
// Now finish stream1, stream2 should be started and exhaust the id, // Now finish stream1, stream2 should be started and exhaust the id,
@ -753,8 +744,8 @@ public class OkHttpClientTransportTest {
listener3.waitUntilStreamClosed(); listener3.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener3.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener3.status.getCode());
assertEquals(0, clientTransport.getPendingStreamSize()); assertEquals(0, clientTransport.getPendingStreamSize());
assertEquals(1, streams.size()); assertEquals(1, activeStreamCount());
OkHttpClientStream stream2 = streams.get(startId + 2); OkHttpClientStream stream2 = getStream(startId + 2);
assertNotNull(stream2); assertNotNull(stream2);
stream2.cancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED);
} }
@ -917,7 +908,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream( OkHttpClientStream stream = clientTransport.newStream(
method,new Metadata.Headers(), listener); method, new Metadata.Headers(), listener);
assertTrue(stream.isReady()); assertTrue(stream.isReady());
assertTrue(listener.isOnReadyCalled()); assertTrue(listener.isOnReadyCalled());
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
@ -1129,6 +1120,17 @@ public class OkHttpClientTransportTest {
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
} }
private int activeStreamCount() {
return clientTransport.getActiveStreams().length;
}
private OkHttpClientStream getStream(int streamId) {
return clientTransport.getStream(streamId);
}
void assertContainStream(int streamId) {
assertNotNull(clientTransport.getStream(streamId));
}
private ClientFrameHandler frameHandler() throws Exception { private ClientFrameHandler frameHandler() throws Exception {
if (frameHandler == null) { if (frameHandler == null) {