From 55e3b718884dbc745cbd7cd3a90842487fe2580c Mon Sep 17 00:00:00 2001 From: Xiao Hang Date: Tue, 14 Mar 2017 15:28:22 -0700 Subject: [PATCH] Move okhttp to AbstractStream2 --- .../grpc/internal/AbstractClientStream2.java | 8 - .../main/java/io/grpc/internal/Framer.java | 2 +- .../Http2ClientStreamTransportState.java | 6 + .../internal/AbstractClientStream2Test.java | 12 - .../Http2ClientStreamTransportStateTest.java | 11 + .../io/grpc/okhttp/OkHttpClientStream.java | 397 ++++++++++-------- .../io/grpc/okhttp/OkHttpClientTransport.java | 29 +- .../grpc/okhttp/OutboundFlowController.java | 2 +- .../grpc/okhttp/OkHttpClientStreamTest.java | 22 +- .../okhttp/OkHttpClientTransportTest.java | 16 +- 10 files changed, 277 insertions(+), 228 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream2.java b/core/src/main/java/io/grpc/internal/AbstractClientStream2.java index f04a0538a0..c8e6a633a1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream2.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream2.java @@ -190,7 +190,6 @@ public abstract class AbstractClientStream2 extends AbstractStream2 private Runnable deliveryStalledTask; - private boolean headersReceived; /** * Whether the stream is closed from the transport's perspective. This can differ from {@link * #listenerClosed} because there may still be messages buffered to deliver to the application. @@ -233,7 +232,6 @@ public abstract class AbstractClientStream2 extends AbstractStream2 */ protected void inboundHeadersReceived(Metadata headers) { Preconditions.checkState(!statusReported, "Received headers on closed stream"); - headersReceived = true; listener().headersRead(headers); } @@ -250,12 +248,6 @@ public abstract class AbstractClientStream2 extends AbstractStream2 log.log(Level.INFO, "Received data on closed stream"); return; } - if (!headersReceived) { - transportReportStatus( - Status.INTERNAL.withDescription("headers not received before payload"), - false, new Metadata()); - return; - } needToCloseFrame = false; deframe(frame, false); diff --git a/core/src/main/java/io/grpc/internal/Framer.java b/core/src/main/java/io/grpc/internal/Framer.java index 3239d7e5b8..b13e3bd441 100644 --- a/core/src/main/java/io/grpc/internal/Framer.java +++ b/core/src/main/java/io/grpc/internal/Framer.java @@ -35,7 +35,7 @@ import io.grpc.Compressor; import java.io.InputStream; /** Interface for framing gRPC messages. */ -interface Framer { +public interface Framer { /** * Writes out a payload message. * diff --git a/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java b/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java index cfab5f3a99..81478167da 100644 --- a/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java +++ b/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java @@ -147,6 +147,12 @@ public abstract class Http2ClientStreamTransportState extends AbstractClientStre http2ProcessingFailed(transportError, transportErrorMetadata); } } else { + if (!headersReceived) { + http2ProcessingFailed( + Status.INTERNAL.withDescription("headers not received before payload"), + new Metadata()); + return; + } inboundDataReceived(frame); if (endOfStream) { // This is a protocol violation as we expect to receive trailers. diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java b/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java index a63d753609..068855be49 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java @@ -32,7 +32,6 @@ package io.grpc.internal; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -161,17 +160,6 @@ public class AbstractClientStream2Test { state.inboundDataReceived(null); } - @Test - public void inboundDataReceived_failsOnNoHeaders() { - AbstractClientStream2 stream = new BaseAbstractClientStream(allocator, statsTraceCtx); - stream.start(mockListener); - - stream.transportState().inboundDataReceived(ReadableBuffers.empty()); - - verify(mockListener).closed(statusCaptor.capture(), any(Metadata.class)); - assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode()); - } - @Test public void inboundHeadersReceived_notifiesListener() { AbstractClientStream2 stream = new BaseAbstractClientStream(allocator, statsTraceCtx); diff --git a/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java b/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java index ce09427b2d..59af94c912 100644 --- a/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java +++ b/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java @@ -200,6 +200,17 @@ public class Http2ClientStreamTransportStateTest { assertTrue(statusCaptor.getValue().getDescription().contains(testString)); } + @Test + public void transportDataReceived_noHeaderReceived() { + BaseTransportState state = new BaseTransportState(); + state.setListener(mockListener); + String testString = "This is a test"; + state.transportDataReceived(ReadableBuffers.wrap(testString.getBytes(US_ASCII)), true); + + verify(mockListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode()); + } + @Test public void transportDataReceived_debugData() { BaseTransportState state = new BaseTransportState(); diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 465db93a45..e1977644aa 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -38,9 +38,9 @@ import io.grpc.Attributes; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; -import io.grpc.internal.ClientStreamListener; +import io.grpc.internal.AbstractClientStream2; import io.grpc.internal.GrpcUtil; -import io.grpc.internal.Http2ClientStream; +import io.grpc.internal.Http2ClientStreamTransportState; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.WritableBuffer; import io.grpc.okhttp.internal.framed.ErrorCode; @@ -54,38 +54,23 @@ import okio.Buffer; /** * Client stream for the okhttp transport. */ -class OkHttpClientStream extends Http2ClientStream { +class OkHttpClientStream extends AbstractClientStream2 { private static final int WINDOW_UPDATE_THRESHOLD = Utils.DEFAULT_WINDOW_SIZE / 2; private static final Buffer EMPTY_BUFFER = new Buffer(); - @GuardedBy("lock") - private int window = Utils.DEFAULT_WINDOW_SIZE; - @GuardedBy("lock") - private int processedWindow = Utils.DEFAULT_WINDOW_SIZE; + public static final int ABSENT_ID = -1; + private final MethodDescriptor method; - /** {@code null} iff start has been called. */ - private Metadata headers; - private final AsyncFrameWriter frameWriter; - private final OutboundFlowController outboundFlow; - private final OkHttpClientTransport transport; - private final Object lock; + private final String userAgent; private final StatsTraceContext statsTraceCtx; private String authority; private Object outboundFlowState; private volatile int id = ABSENT_ID; - @GuardedBy("lock") - private List
requestHeaders; - /** - * Null iff {@link #requestHeaders} is null. Non-null iff neither {@link #sendCancel} nor - * {@link #start(int)} have been called. - */ - @GuardedBy("lock") - private Queue pendingData = new ArrayDeque(); - @GuardedBy("lock") - private boolean cancelSent = false; + private final TransportState state; + private final Sink sink = new Sink(); OkHttpClientStream( MethodDescriptor method, @@ -98,16 +83,23 @@ class OkHttpClientStream extends Http2ClientStream { String authority, String userAgent, StatsTraceContext statsTraceCtx) { - super(new OkHttpWritableBufferAllocator(), maxMessageSize, statsTraceCtx); + super(new OkHttpWritableBufferAllocator(), statsTraceCtx, headers, false); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.method = method; - this.headers = headers; - this.frameWriter = frameWriter; - this.transport = transport; - this.outboundFlow = outboundFlow; - this.lock = lock; this.authority = authority; this.userAgent = userAgent; + this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow, + transport); + } + + @Override + protected TransportState transportState() { + return state; + } + + @Override + protected Sink abstractClientStreamSink() { + return sink; } /** @@ -117,21 +109,12 @@ class OkHttpClientStream extends Http2ClientStream { return method.getType(); } - @Override - public void request(final int numMessages) { - synchronized (lock) { - requestMessagesFromDeframer(numMessages); - } - } - - @Override public int id() { return id; } @Override public void setAuthority(String authority) { - checkState(listener() == null, "must be call before start"); this.authority = checkNotNull(authority, "authority"); } @@ -140,102 +123,212 @@ class OkHttpClientStream extends Http2ClientStream { return Attributes.EMPTY; } - @Override - public void start(ClientStreamListener listener) { - super.start(listener); - String defaultPath = "/" + method.getFullMethodName(); - headers.discardAll(GrpcUtil.USER_AGENT_KEY); - List
requestHeaders = - Headers.createRequestHeaders(headers, defaultPath, authority, userAgent); - headers = null; - synchronized (lock) { - this.requestHeaders = requestHeaders; - transport.streamReadyToStart(this); + class Sink implements AbstractClientStream2.Sink { + @Override + public void writeHeaders(Metadata metadata, byte[] payload) { + String defaultPath = "/" + method.getFullMethodName(); + metadata.discardAll(GrpcUtil.USER_AGENT_KEY); + synchronized (state.lock) { + state.streamReady(metadata, defaultPath); + } } - } - @GuardedBy("lock") - public void start(int id) { - checkState(this.id == ABSENT_ID, "the stream has been started with id %s", this.id); - this.id = id; - - if (pendingData != null) { - // Only happens when the stream has neither been started nor cancelled. - frameWriter.synStream(false, false, id, 0, requestHeaders); - statsTraceCtx.clientHeadersSent(); - requestHeaders = null; - - boolean flush = false; - while (!pendingData.isEmpty()) { - PendingData data = pendingData.poll(); - outboundFlow.data(data.endOfStream, id, data.buffer, false); - if (data.flush) { - flush = true; + @Override + public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + Buffer buffer; + if (frame == null) { + buffer = EMPTY_BUFFER; + } else { + buffer = ((OkHttpWritableBuffer) frame).buffer(); + int size = (int) buffer.size(); + if (size > 0) { + onSendingBytes(size); } } - if (flush) { - outboundFlow.flush(); - } - pendingData = null; - } - } - /** - * Notification that this stream was allocated for the connection. This means the stream has - * passed through any delay caused by MAX_CONCURRENT_STREAMS. - */ - public void allocated() { - // Now that the stream has actually been initialized, call the listener's onReady callback if - // appropriate. - onStreamAllocated(); - } - - void onStreamSentBytes(int numBytes) { - onSentBytes(numBytes); - } - - /** - * Must be called with holding the transport lock. - */ - @GuardedBy("lock") - public void transportHeadersReceived(List
headers, boolean endOfStream) { - if (endOfStream) { - transportTrailersReceived(Utils.convertTrailers(headers)); - } else { - transportHeadersReceived(Utils.convertHeaders(headers)); - } - } - - /** - * Must be called with holding the transport lock. - */ - @GuardedBy("lock") - public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { - long length = frame.size(); - window -= length; - if (window < 0) { - frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); - transport.finishStream(id(), Status.INTERNAL.withDescription( - "Received data size exceeded our receiving window size"), null); - return; - } - super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); - } - - @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { - Buffer buffer; - if (frame == null) { - buffer = EMPTY_BUFFER; - } else { - buffer = ((OkHttpWritableBuffer) frame).buffer(); - int size = (int) buffer.size(); - if (size > 0) { - onSendingBytes(size); + synchronized (state.lock) { + state.sendBuffer(buffer, endOfStream, flush); } } - synchronized (lock) { + @Override + public void request(final int numMessages) { + synchronized (state.lock) { + state.requestMessagesFromDeframer(numMessages); + } + } + + @Override + public void cancel(Status reason) { + synchronized (state.lock) { + state.cancel(reason, null); + } + } + } + + class TransportState extends Http2ClientStreamTransportState { + private final Object lock; + @GuardedBy("lock") + private List
requestHeaders; + /** + * Null iff {@link #requestHeaders} is null. Non-null iff neither {@link #sendCancel} nor + * {@link #start(int)} have been called. + */ + @GuardedBy("lock") + private Queue pendingData = new ArrayDeque(); + @GuardedBy("lock") + private boolean cancelSent = false; + @GuardedBy("lock") + private int window = Utils.DEFAULT_WINDOW_SIZE; + @GuardedBy("lock") + private int processedWindow = Utils.DEFAULT_WINDOW_SIZE; + @GuardedBy("lock") + private final AsyncFrameWriter frameWriter; + @GuardedBy("lock") + private final OutboundFlowController outboundFlow; + @GuardedBy("lock") + private final OkHttpClientTransport transport; + + public TransportState( + int maxMessageSize, + StatsTraceContext statsTraceCtx, + Object lock, + AsyncFrameWriter frameWriter, + OutboundFlowController outboundFlow, + OkHttpClientTransport transport) { + super(maxMessageSize, statsTraceCtx); + this.lock = checkNotNull(lock, "lock"); + this.frameWriter = frameWriter; + this.outboundFlow = outboundFlow; + this.transport = transport; + } + + @GuardedBy("lock") + public void start(int streamId) { + checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId); + id = streamId; + state.onStreamAllocated(); + + if (pendingData != null) { + // Only happens when the stream has neither been started nor cancelled. + frameWriter.synStream(false, false, id, 0, requestHeaders); + statsTraceCtx.clientHeadersSent(); + requestHeaders = null; + + boolean flush = false; + while (!pendingData.isEmpty()) { + PendingData data = pendingData.poll(); + outboundFlow.data(data.endOfStream, id, data.buffer, false); + if (data.flush) { + flush = true; + } + } + if (flush) { + outboundFlow.flush(); + } + pendingData = null; + } + } + + @GuardedBy("lock") + @Override + protected void onStreamAllocated() { + super.onStreamAllocated(); + } + + @GuardedBy("lock") + @Override + protected void http2ProcessingFailed(Status status, Metadata trailers) { + cancel(status, trailers); + } + + @GuardedBy("lock") + @Override + protected void deframeFailed(Throwable cause) { + http2ProcessingFailed(Status.fromThrowable(cause), new Metadata()); + } + + @GuardedBy("lock") + @Override + public void bytesRead(int processedBytes) { + processedWindow -= processedBytes; + if (processedWindow <= WINDOW_UPDATE_THRESHOLD) { + int delta = Utils.DEFAULT_WINDOW_SIZE - processedWindow; + window += delta; + processedWindow += delta; + frameWriter.windowUpdate(id(), delta); + } + } + + /** + * Must be called with holding the transport lock. + */ + @GuardedBy("lock") + public void transportHeadersReceived(List
headers, boolean endOfStream) { + if (endOfStream) { + transportTrailersReceived(Utils.convertTrailers(headers)); + onEndOfStream(); + } else { + transportHeadersReceived(Utils.convertHeaders(headers)); + } + } + + /** + * Must be called with holding the transport lock. + */ + @GuardedBy("lock") + public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { + long length = frame.size(); + window -= length; + if (window < 0) { + frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); + transport.finishStream(id(), Status.INTERNAL.withDescription( + "Received data size exceeded our receiving window size"), null, null); + return; + } + super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); + if (endOfStream) { + onEndOfStream(); + } + } + + @GuardedBy("lock") + private void onEndOfStream() { + if (!framer().isClosed()) { + // If server's end-of-stream is received before client sends end-of-stream, we just send a + // reset to server to fully close the server side stream. + transport.finishStream(id(), null, ErrorCode.CANCEL, null); + } else { + transport.finishStream(id(), null, null, null); + } + } + + + @GuardedBy("lock") + private void cancel(Status reason, Metadata trailers) { + if (cancelSent) { + return; + } + cancelSent = true; + if (pendingData != null) { + // stream is pending. + transport.removePendingStream(OkHttpClientStream.this); + // release holding data, so they can be GCed or returned to pool earlier. + requestHeaders = null; + for (PendingData data : pendingData) { + data.buffer.clear(); + } + pendingData = null; + transportReportStatus(reason, true, trailers != null ? trailers : new Metadata()); + } else { + // If pendingData is null, start must have already been called, which means synStream has + // been called as well. + transport.finishStream(id(), reason, ErrorCode.CANCEL, trailers); + } + } + + @GuardedBy("lock") + private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) { if (cancelSent) { return; } @@ -249,57 +342,15 @@ class OkHttpClientStream extends Http2ClientStream { outboundFlow.data(endOfStream, id(), buffer, flush); } } - } - @Override - protected void returnProcessedBytes(int processedBytes) { - synchronized (lock) { - processedWindow -= processedBytes; - if (processedWindow <= WINDOW_UPDATE_THRESHOLD) { - int delta = Utils.DEFAULT_WINDOW_SIZE - processedWindow; - window += delta; - processedWindow += delta; - frameWriter.windowUpdate(id(), delta); - } + @GuardedBy("lock") + private void streamReady(Metadata metadata, String path) { + requestHeaders = + Headers.createRequestHeaders(metadata, path, authority, userAgent); + transport.streamReadyToStart(OkHttpClientStream.this); } } - @Override - protected void sendCancel(Status reason) { - synchronized (lock) { - if (cancelSent) { - return; - } - cancelSent = true; - if (pendingData != null) { - // stream is pending. - transport.removePendingStream(this); - // release holding data, so they can be GCed or returned to pool earlier. - requestHeaders = null; - for (PendingData data : pendingData) { - data.buffer.clear(); - } - pendingData = null; - transportReportStatus(reason, true, new Metadata()); - } else { - // If pendingData is null, start must have already been called, which means synStream has - // been called as well. - transport.finishStream(id(), reason, ErrorCode.CANCEL); - } - } - } - - @Override - public void remoteEndClosed() { - super.remoteEndClosed(); - if (canSend()) { - // If server's end-of-stream is received before client sends end-of-stream, we just send a - // reset to server to fully close the server side stream. - frameWriter.rstStream(id(), ErrorCode.CANCEL); - } - transport.finishStream(id(), null, null); - } - void setOutboundFlowState(Object outboundFlowState) { this.outboundFlowState = outboundFlowState; } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index d0558284c7..dd93f75a9c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -309,7 +309,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { void streamReadyToStart(OkHttpClientStream clientStream) { synchronized (lock) { if (goAwayStatus != null) { - clientStream.transportReportStatus(goAwayStatus, true, new Metadata()); + clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata()); } else if (streams.size() >= maxConcurrentStreams) { pendingStreams.add(clientStream); setInUse(); @@ -325,8 +325,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); streams.put(nextStreamId, stream); setInUse(); - stream.start(nextStreamId); - stream.allocated(); + stream.transportState().start(nextStreamId); // For unary and server streaming, there will be a data frame soon, no need to flush the header. if (stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING) { @@ -380,7 +379,6 @@ class OkHttpClientTransport implements ConnectionClientTransport { frameWriter = new AsyncFrameWriter(this, serializingExecutor); outboundFlow = new OutboundFlowController(this, frameWriter); - // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. serializingExecutor.execute(new Runnable() { @@ -620,11 +618,11 @@ class OkHttpClientTransport implements ConnectionClientTransport { while (it.hasNext()) { Map.Entry entry = it.next(); it.remove(); - entry.getValue().transportReportStatus(reason, false, new Metadata()); + entry.getValue().transportState().transportReportStatus(reason, false, new Metadata()); } for (OkHttpClientStream stream : pendingStreams) { - stream.transportReportStatus(reason, true, new Metadata()); + stream.transportState().transportReportStatus(reason, true, new Metadata()); } pendingStreams.clear(); maybeClearInUse(); @@ -694,12 +692,12 @@ class OkHttpClientTransport implements ConnectionClientTransport { Map.Entry entry = it.next(); if (entry.getKey() > lastKnownStreamId) { it.remove(); - entry.getValue().transportReportStatus(status, false, new Metadata()); + entry.getValue().transportState().transportReportStatus(status, false, new Metadata()); } } for (OkHttpClientStream stream : pendingStreams) { - stream.transportReportStatus(status, true, new Metadata()); + stream.transportState().transportReportStatus(status, true, new Metadata()); } pendingStreams.clear(); maybeClearInUse(); @@ -720,8 +718,10 @@ class OkHttpClientTransport implements ConnectionClientTransport { * @param streamId the Id of the stream. * @param status the final status of this stream, null means no need to report. * @param errorCode reset the stream with this ErrorCode if not null. + * @param trailers the trailers received if not null */ - void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode errorCode) { + void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode errorCode, + @Nullable Metadata trailers) { synchronized (lock) { OkHttpClientStream stream = streams.remove(streamId); if (stream != null) { @@ -731,7 +731,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { if (status != null) { boolean isCancelled = (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED); - stream.transportReportStatus(status, isCancelled, new Metadata()); + stream.transportState().transportReportStatus(status, isCancelled, + trailers != null ? trailers : new Metadata()); } if (!startPendingStreams()) { stopIfNecessary(); @@ -909,7 +910,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { Buffer buf = new Buffer(); buf.write(in.buffer(), length); synchronized (lock) { - stream.transportDataReceived(buf, inFinished); + stream.transportState().transportDataReceived(buf, inFinished); } } @@ -941,7 +942,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { unknownStream = true; } } else { - stream.transportHeadersReceived(headerBlock, inFinished); + stream.transportState().transportHeadersReceived(headerBlock, inFinished); } } if (unknownStream) { @@ -952,7 +953,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { @Override public void rstStream(int streamId, ErrorCode errorCode) { - finishStream(streamId, toGrpcStatus(errorCode).augmentDescription("Rst Stream"), null); + finishStream(streamId, toGrpcStatus(errorCode).augmentDescription("Rst Stream"), null, null); } @Override @@ -1037,7 +1038,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { onError(ErrorCode.PROTOCOL_ERROR, errorMsg); } else { finishStream(streamId, - Status.INTERNAL.withDescription(errorMsg), ErrorCode.PROTOCOL_ERROR); + Status.INTERNAL.withDescription(errorMsg), ErrorCode.PROTOCOL_ERROR, null); } return; } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java index a11ae0ed75..2827e1b444 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java @@ -396,7 +396,7 @@ class OutboundFlowController { } catch (IOException e) { throw new RuntimeException(e); } - stream.onStreamSentBytes(bytesToWrite); + stream.transportState().onSentBytes(bytesToWrite); if (enqueued) { // It's enqueued - remove it from the head of the pending write queue. diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java index 73406327a2..a95776aa5b 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java @@ -97,7 +97,7 @@ public class OkHttpClientStreamTest { } @Test - public void sendCancel_notStarted() { + public void cancel_notStarted() { final AtomicReference statusRef = new AtomicReference(); stream.start(new BaseClientStreamListener() { @Override @@ -107,34 +107,34 @@ public class OkHttpClientStreamTest { } }); - stream.sendCancel(Status.CANCELLED); + stream.cancel(Status.CANCELLED); assertEquals(Status.Code.CANCELLED, statusRef.get().getCode()); } @Test - public void sendCancel_started() { + public void cancel_started() { stream.start(new BaseClientStreamListener()); - stream.start(1234); + stream.transportState().start(1234); Mockito.doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { assertTrue(Thread.holdsLock(lock)); return null; } - }).when(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL); + }).when(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL, null); - stream.sendCancel(Status.CANCELLED); + stream.cancel(Status.CANCELLED); - verify(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL); + verify(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL, null); } @Test public void start_alreadyCancelled() { stream.start(new BaseClientStreamListener()); - stream.sendCancel(Status.CANCELLED); + stream.cancel(Status.CANCELLED); - stream.start(1234); + stream.transportState().start(1234); verifyNoMoreInteractions(frameWriter); } @@ -147,7 +147,7 @@ public class OkHttpClientStreamTest { flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", StatsTraceContext.NOOP); stream.start(new BaseClientStreamListener()); - stream.start(3); + stream.transportState().start(3); verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture()); assertThat(headersCaptor.getValue()) @@ -162,7 +162,7 @@ public class OkHttpClientStreamTest { flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", StatsTraceContext.NOOP); stream.start(new BaseClientStreamListener()); - stream.start(3); + stream.transportState().start(3); verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture()); assertThat(headersCaptor.getValue()).containsExactly( diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 4b0db01a2c..d9f08e0c03 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -827,7 +827,7 @@ public class OkHttpClientTransportTest { Buffer sentFrame = captor.getValue(); assertEquals(createMessageFrame(sentMessage), sentFrame); verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0)); - stream2.sendCancel(Status.CANCELLED); + stream2.cancel(Status.CANCELLED); shutdownAndVerify(); } @@ -839,9 +839,9 @@ public class OkHttpClientTransportTest { OkHttpClientStream stream = clientTransport.newStream(method, new Metadata()); stream.start(listener); waitForStreamPending(1); - stream.sendCancel(Status.CANCELLED); + stream.cancel(Status.CANCELLED); // The second cancel should be an no-op. - stream.sendCancel(Status.UNKNOWN); + stream.cancel(Status.UNKNOWN); listener.waitUntilStreamClosed(); assertEquals(0, clientTransport.getPendingStreamSize()); assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); @@ -872,7 +872,7 @@ public class OkHttpClientTransportTest { // active stream should not be affected. assertEquals(1, activeStreamCount()); - getStream(3).sendCancel(Status.CANCELLED); + getStream(3).cancel(Status.CANCELLED); shutdownAndVerify(); } @@ -891,7 +891,7 @@ public class OkHttpClientTransportTest { verify(frameWriter, timeout(TIME_OUT_MS)) .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); assertEquals(1, activeStreamCount()); - stream.sendCancel(Status.CANCELLED); + stream.cancel(Status.CANCELLED); shutdownAndVerify(); } @@ -1013,7 +1013,7 @@ public class OkHttpClientTransportTest { listener.waitUntilStreamClosed(); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); - assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); + assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); assertEquals(0, listener.messages.size()); shutdownAndVerify(); } @@ -1034,7 +1034,7 @@ public class OkHttpClientTransportTest { listener.waitUntilStreamClosed(); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); - assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); + assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); assertEquals(0, listener.messages.size()); shutdownAndVerify(); } @@ -1054,7 +1054,7 @@ public class OkHttpClientTransportTest { listener.waitUntilStreamClosed(); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); - assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); + assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); assertEquals(0, listener.messages.size()); shutdownAndVerify(); }