Move okhttp to AbstractStream2

This commit is contained in:
Xiao Hang 2017-03-14 15:28:22 -07:00
parent ffc64b70bb
commit 55e3b71888
10 changed files with 277 additions and 228 deletions

View File

@ -190,7 +190,6 @@ public abstract class AbstractClientStream2 extends AbstractStream2
private Runnable deliveryStalledTask; private Runnable deliveryStalledTask;
private boolean headersReceived;
/** /**
* Whether the stream is closed from the transport's perspective. This can differ from {@link * Whether the stream is closed from the transport's perspective. This can differ from {@link
* #listenerClosed} because there may still be messages buffered to deliver to the application. * #listenerClosed} because there may still be messages buffered to deliver to the application.
@ -233,7 +232,6 @@ public abstract class AbstractClientStream2 extends AbstractStream2
*/ */
protected void inboundHeadersReceived(Metadata headers) { protected void inboundHeadersReceived(Metadata headers) {
Preconditions.checkState(!statusReported, "Received headers on closed stream"); Preconditions.checkState(!statusReported, "Received headers on closed stream");
headersReceived = true;
listener().headersRead(headers); listener().headersRead(headers);
} }
@ -250,12 +248,6 @@ public abstract class AbstractClientStream2 extends AbstractStream2
log.log(Level.INFO, "Received data on closed stream"); log.log(Level.INFO, "Received data on closed stream");
return; return;
} }
if (!headersReceived) {
transportReportStatus(
Status.INTERNAL.withDescription("headers not received before payload"),
false, new Metadata());
return;
}
needToCloseFrame = false; needToCloseFrame = false;
deframe(frame, false); deframe(frame, false);

View File

@ -35,7 +35,7 @@ import io.grpc.Compressor;
import java.io.InputStream; import java.io.InputStream;
/** Interface for framing gRPC messages. */ /** Interface for framing gRPC messages. */
interface Framer { public interface Framer {
/** /**
* Writes out a payload message. * Writes out a payload message.
* *

View File

@ -147,6 +147,12 @@ public abstract class Http2ClientStreamTransportState extends AbstractClientStre
http2ProcessingFailed(transportError, transportErrorMetadata); http2ProcessingFailed(transportError, transportErrorMetadata);
} }
} else { } else {
if (!headersReceived) {
http2ProcessingFailed(
Status.INTERNAL.withDescription("headers not received before payload"),
new Metadata());
return;
}
inboundDataReceived(frame); inboundDataReceived(frame);
if (endOfStream) { if (endOfStream) {
// This is a protocol violation as we expect to receive trailers. // This is a protocol violation as we expect to receive trailers.

View File

@ -32,7 +32,6 @@
package io.grpc.internal; package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -161,17 +160,6 @@ public class AbstractClientStream2Test {
state.inboundDataReceived(null); state.inboundDataReceived(null);
} }
@Test
public void inboundDataReceived_failsOnNoHeaders() {
AbstractClientStream2 stream = new BaseAbstractClientStream(allocator, statsTraceCtx);
stream.start(mockListener);
stream.transportState().inboundDataReceived(ReadableBuffers.empty());
verify(mockListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode());
}
@Test @Test
public void inboundHeadersReceived_notifiesListener() { public void inboundHeadersReceived_notifiesListener() {
AbstractClientStream2 stream = new BaseAbstractClientStream(allocator, statsTraceCtx); AbstractClientStream2 stream = new BaseAbstractClientStream(allocator, statsTraceCtx);

View File

@ -200,6 +200,17 @@ public class Http2ClientStreamTransportStateTest {
assertTrue(statusCaptor.getValue().getDescription().contains(testString)); assertTrue(statusCaptor.getValue().getDescription().contains(testString));
} }
@Test
public void transportDataReceived_noHeaderReceived() {
BaseTransportState state = new BaseTransportState();
state.setListener(mockListener);
String testString = "This is a test";
state.transportDataReceived(ReadableBuffers.wrap(testString.getBytes(US_ASCII)), true);
verify(mockListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode());
}
@Test @Test
public void transportDataReceived_debugData() { public void transportDataReceived_debugData() {
BaseTransportState state = new BaseTransportState(); BaseTransportState state = new BaseTransportState();

View File

@ -38,9 +38,9 @@ import io.grpc.Attributes;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.ClientStreamListener; import io.grpc.internal.AbstractClientStream2;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2ClientStream; import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBuffer;
import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.ErrorCode;
@ -54,38 +54,23 @@ import okio.Buffer;
/** /**
* Client stream for the okhttp transport. * Client stream for the okhttp transport.
*/ */
class OkHttpClientStream extends Http2ClientStream { class OkHttpClientStream extends AbstractClientStream2 {
private static final int WINDOW_UPDATE_THRESHOLD = Utils.DEFAULT_WINDOW_SIZE / 2; private static final int WINDOW_UPDATE_THRESHOLD = Utils.DEFAULT_WINDOW_SIZE / 2;
private static final Buffer EMPTY_BUFFER = new Buffer(); private static final Buffer EMPTY_BUFFER = new Buffer();
@GuardedBy("lock") public static final int ABSENT_ID = -1;
private int window = Utils.DEFAULT_WINDOW_SIZE;
@GuardedBy("lock")
private int processedWindow = Utils.DEFAULT_WINDOW_SIZE;
private final MethodDescriptor<?, ?> method; private final MethodDescriptor<?, ?> method;
/** {@code null} iff start has been called. */
private Metadata headers;
private final AsyncFrameWriter frameWriter;
private final OutboundFlowController outboundFlow;
private final OkHttpClientTransport transport;
private final Object lock;
private final String userAgent; private final String userAgent;
private final StatsTraceContext statsTraceCtx; private final StatsTraceContext statsTraceCtx;
private String authority; private String authority;
private Object outboundFlowState; private Object outboundFlowState;
private volatile int id = ABSENT_ID; private volatile int id = ABSENT_ID;
@GuardedBy("lock") private final TransportState state;
private List<Header> requestHeaders; private final Sink sink = new Sink();
/**
* Null iff {@link #requestHeaders} is null. Non-null iff neither {@link #sendCancel} nor
* {@link #start(int)} have been called.
*/
@GuardedBy("lock")
private Queue<PendingData> pendingData = new ArrayDeque<PendingData>();
@GuardedBy("lock")
private boolean cancelSent = false;
OkHttpClientStream( OkHttpClientStream(
MethodDescriptor<?, ?> method, MethodDescriptor<?, ?> method,
@ -98,16 +83,23 @@ class OkHttpClientStream extends Http2ClientStream {
String authority, String authority,
String userAgent, String userAgent,
StatsTraceContext statsTraceCtx) { StatsTraceContext statsTraceCtx) {
super(new OkHttpWritableBufferAllocator(), maxMessageSize, statsTraceCtx); super(new OkHttpWritableBufferAllocator(), statsTraceCtx, headers, false);
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.method = method; this.method = method;
this.headers = headers;
this.frameWriter = frameWriter;
this.transport = transport;
this.outboundFlow = outboundFlow;
this.lock = lock;
this.authority = authority; this.authority = authority;
this.userAgent = userAgent; this.userAgent = userAgent;
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow,
transport);
}
@Override
protected TransportState transportState() {
return state;
}
@Override
protected Sink abstractClientStreamSink() {
return sink;
} }
/** /**
@ -117,21 +109,12 @@ class OkHttpClientStream extends Http2ClientStream {
return method.getType(); return method.getType();
} }
@Override
public void request(final int numMessages) {
synchronized (lock) {
requestMessagesFromDeframer(numMessages);
}
}
@Override
public int id() { public int id() {
return id; return id;
} }
@Override @Override
public void setAuthority(String authority) { public void setAuthority(String authority) {
checkState(listener() == null, "must be call before start");
this.authority = checkNotNull(authority, "authority"); this.authority = checkNotNull(authority, "authority");
} }
@ -140,24 +123,91 @@ class OkHttpClientStream extends Http2ClientStream {
return Attributes.EMPTY; return Attributes.EMPTY;
} }
class Sink implements AbstractClientStream2.Sink {
@Override @Override
public void start(ClientStreamListener listener) { public void writeHeaders(Metadata metadata, byte[] payload) {
super.start(listener);
String defaultPath = "/" + method.getFullMethodName(); String defaultPath = "/" + method.getFullMethodName();
headers.discardAll(GrpcUtil.USER_AGENT_KEY); metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
List<Header> requestHeaders = synchronized (state.lock) {
Headers.createRequestHeaders(headers, defaultPath, authority, userAgent); state.streamReady(metadata, defaultPath);
headers = null;
synchronized (lock) {
this.requestHeaders = requestHeaders;
transport.streamReadyToStart(this);
} }
} }
@Override
public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
Buffer buffer;
if (frame == null) {
buffer = EMPTY_BUFFER;
} else {
buffer = ((OkHttpWritableBuffer) frame).buffer();
int size = (int) buffer.size();
if (size > 0) {
onSendingBytes(size);
}
}
synchronized (state.lock) {
state.sendBuffer(buffer, endOfStream, flush);
}
}
@Override
public void request(final int numMessages) {
synchronized (state.lock) {
state.requestMessagesFromDeframer(numMessages);
}
}
@Override
public void cancel(Status reason) {
synchronized (state.lock) {
state.cancel(reason, null);
}
}
}
class TransportState extends Http2ClientStreamTransportState {
private final Object lock;
@GuardedBy("lock") @GuardedBy("lock")
public void start(int id) { private List<Header> requestHeaders;
checkState(this.id == ABSENT_ID, "the stream has been started with id %s", this.id); /**
this.id = id; * Null iff {@link #requestHeaders} is null. Non-null iff neither {@link #sendCancel} nor
* {@link #start(int)} have been called.
*/
@GuardedBy("lock")
private Queue<PendingData> pendingData = new ArrayDeque<PendingData>();
@GuardedBy("lock")
private boolean cancelSent = false;
@GuardedBy("lock")
private int window = Utils.DEFAULT_WINDOW_SIZE;
@GuardedBy("lock")
private int processedWindow = Utils.DEFAULT_WINDOW_SIZE;
@GuardedBy("lock")
private final AsyncFrameWriter frameWriter;
@GuardedBy("lock")
private final OutboundFlowController outboundFlow;
@GuardedBy("lock")
private final OkHttpClientTransport transport;
public TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
Object lock,
AsyncFrameWriter frameWriter,
OutboundFlowController outboundFlow,
OkHttpClientTransport transport) {
super(maxMessageSize, statsTraceCtx);
this.lock = checkNotNull(lock, "lock");
this.frameWriter = frameWriter;
this.outboundFlow = outboundFlow;
this.transport = transport;
}
@GuardedBy("lock")
public void start(int streamId) {
checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId);
id = streamId;
state.onStreamAllocated();
if (pendingData != null) { if (pendingData != null) {
// Only happens when the stream has neither been started nor cancelled. // Only happens when the stream has neither been started nor cancelled.
@ -180,18 +230,34 @@ class OkHttpClientStream extends Http2ClientStream {
} }
} }
/** @GuardedBy("lock")
* Notification that this stream was allocated for the connection. This means the stream has @Override
* passed through any delay caused by MAX_CONCURRENT_STREAMS. protected void onStreamAllocated() {
*/ super.onStreamAllocated();
public void allocated() {
// Now that the stream has actually been initialized, call the listener's onReady callback if
// appropriate.
onStreamAllocated();
} }
void onStreamSentBytes(int numBytes) { @GuardedBy("lock")
onSentBytes(numBytes); @Override
protected void http2ProcessingFailed(Status status, Metadata trailers) {
cancel(status, trailers);
}
@GuardedBy("lock")
@Override
protected void deframeFailed(Throwable cause) {
http2ProcessingFailed(Status.fromThrowable(cause), new Metadata());
}
@GuardedBy("lock")
@Override
public void bytesRead(int processedBytes) {
processedWindow -= processedBytes;
if (processedWindow <= WINDOW_UPDATE_THRESHOLD) {
int delta = Utils.DEFAULT_WINDOW_SIZE - processedWindow;
window += delta;
processedWindow += delta;
frameWriter.windowUpdate(id(), delta);
}
} }
/** /**
@ -201,6 +267,7 @@ class OkHttpClientStream extends Http2ClientStream {
public void transportHeadersReceived(List<Header> headers, boolean endOfStream) { public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
if (endOfStream) { if (endOfStream) {
transportTrailersReceived(Utils.convertTrailers(headers)); transportTrailersReceived(Utils.convertTrailers(headers));
onEndOfStream();
} else { } else {
transportHeadersReceived(Utils.convertHeaders(headers)); transportHeadersReceived(Utils.convertHeaders(headers));
} }
@ -216,26 +283,52 @@ class OkHttpClientStream extends Http2ClientStream {
if (window < 0) { if (window < 0) {
frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
transport.finishStream(id(), Status.INTERNAL.withDescription( transport.finishStream(id(), Status.INTERNAL.withDescription(
"Received data size exceeded our receiving window size"), null); "Received data size exceeded our receiving window size"), null, null);
return; return;
} }
super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
if (endOfStream) {
onEndOfStream();
}
} }
@Override @GuardedBy("lock")
protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { private void onEndOfStream() {
Buffer buffer; if (!framer().isClosed()) {
if (frame == null) { // If server's end-of-stream is received before client sends end-of-stream, we just send a
buffer = EMPTY_BUFFER; // reset to server to fully close the server side stream.
transport.finishStream(id(), null, ErrorCode.CANCEL, null);
} else { } else {
buffer = ((OkHttpWritableBuffer) frame).buffer(); transport.finishStream(id(), null, null, null);
int size = (int) buffer.size();
if (size > 0) {
onSendingBytes(size);
} }
} }
synchronized (lock) {
@GuardedBy("lock")
private void cancel(Status reason, Metadata trailers) {
if (cancelSent) {
return;
}
cancelSent = true;
if (pendingData != null) {
// stream is pending.
transport.removePendingStream(OkHttpClientStream.this);
// release holding data, so they can be GCed or returned to pool earlier.
requestHeaders = null;
for (PendingData data : pendingData) {
data.buffer.clear();
}
pendingData = null;
transportReportStatus(reason, true, trailers != null ? trailers : new Metadata());
} else {
// If pendingData is null, start must have already been called, which means synStream has
// been called as well.
transport.finishStream(id(), reason, ErrorCode.CANCEL, trailers);
}
}
@GuardedBy("lock")
private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) {
if (cancelSent) { if (cancelSent) {
return; return;
} }
@ -249,56 +342,14 @@ class OkHttpClientStream extends Http2ClientStream {
outboundFlow.data(endOfStream, id(), buffer, flush); outboundFlow.data(endOfStream, id(), buffer, flush);
} }
} }
}
@Override @GuardedBy("lock")
protected void returnProcessedBytes(int processedBytes) { private void streamReady(Metadata metadata, String path) {
synchronized (lock) { requestHeaders =
processedWindow -= processedBytes; Headers.createRequestHeaders(metadata, path, authority, userAgent);
if (processedWindow <= WINDOW_UPDATE_THRESHOLD) { transport.streamReadyToStart(OkHttpClientStream.this);
int delta = Utils.DEFAULT_WINDOW_SIZE - processedWindow;
window += delta;
processedWindow += delta;
frameWriter.windowUpdate(id(), delta);
} }
} }
}
@Override
protected void sendCancel(Status reason) {
synchronized (lock) {
if (cancelSent) {
return;
}
cancelSent = true;
if (pendingData != null) {
// stream is pending.
transport.removePendingStream(this);
// release holding data, so they can be GCed or returned to pool earlier.
requestHeaders = null;
for (PendingData data : pendingData) {
data.buffer.clear();
}
pendingData = null;
transportReportStatus(reason, true, new Metadata());
} else {
// If pendingData is null, start must have already been called, which means synStream has
// been called as well.
transport.finishStream(id(), reason, ErrorCode.CANCEL);
}
}
}
@Override
public void remoteEndClosed() {
super.remoteEndClosed();
if (canSend()) {
// If server's end-of-stream is received before client sends end-of-stream, we just send a
// reset to server to fully close the server side stream.
frameWriter.rstStream(id(), ErrorCode.CANCEL);
}
transport.finishStream(id(), null, null);
}
void setOutboundFlowState(Object outboundFlowState) { void setOutboundFlowState(Object outboundFlowState) {
this.outboundFlowState = outboundFlowState; this.outboundFlowState = outboundFlowState;

View File

@ -309,7 +309,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
void streamReadyToStart(OkHttpClientStream clientStream) { void streamReadyToStart(OkHttpClientStream clientStream) {
synchronized (lock) { synchronized (lock) {
if (goAwayStatus != null) { if (goAwayStatus != null) {
clientStream.transportReportStatus(goAwayStatus, true, new Metadata()); clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata());
} else if (streams.size() >= maxConcurrentStreams) { } else if (streams.size() >= maxConcurrentStreams) {
pendingStreams.add(clientStream); pendingStreams.add(clientStream);
setInUse(); setInUse();
@ -325,8 +325,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
streams.put(nextStreamId, stream); streams.put(nextStreamId, stream);
setInUse(); setInUse();
stream.start(nextStreamId); stream.transportState().start(nextStreamId);
stream.allocated();
// For unary and server streaming, there will be a data frame soon, no need to flush the header. // For unary and server streaming, there will be a data frame soon, no need to flush the header.
if (stream.getType() != MethodType.UNARY if (stream.getType() != MethodType.UNARY
&& stream.getType() != MethodType.SERVER_STREAMING) { && stream.getType() != MethodType.SERVER_STREAMING) {
@ -380,7 +379,6 @@ class OkHttpClientTransport implements ConnectionClientTransport {
frameWriter = new AsyncFrameWriter(this, serializingExecutor); frameWriter = new AsyncFrameWriter(this, serializingExecutor);
outboundFlow = new OutboundFlowController(this, frameWriter); outboundFlow = new OutboundFlowController(this, frameWriter);
// Connecting in the serializingExecutor, so that some stream operations like synStream // Connecting in the serializingExecutor, so that some stream operations like synStream
// will be executed after connected. // will be executed after connected.
serializingExecutor.execute(new Runnable() { serializingExecutor.execute(new Runnable() {
@ -620,11 +618,11 @@ class OkHttpClientTransport implements ConnectionClientTransport {
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<Integer, OkHttpClientStream> entry = it.next(); Map.Entry<Integer, OkHttpClientStream> entry = it.next();
it.remove(); it.remove();
entry.getValue().transportReportStatus(reason, false, new Metadata()); entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
} }
for (OkHttpClientStream stream : pendingStreams) { for (OkHttpClientStream stream : pendingStreams) {
stream.transportReportStatus(reason, true, new Metadata()); stream.transportState().transportReportStatus(reason, true, new Metadata());
} }
pendingStreams.clear(); pendingStreams.clear();
maybeClearInUse(); maybeClearInUse();
@ -694,12 +692,12 @@ class OkHttpClientTransport implements ConnectionClientTransport {
Map.Entry<Integer, OkHttpClientStream> entry = it.next(); Map.Entry<Integer, OkHttpClientStream> entry = it.next();
if (entry.getKey() > lastKnownStreamId) { if (entry.getKey() > lastKnownStreamId) {
it.remove(); it.remove();
entry.getValue().transportReportStatus(status, false, new Metadata()); entry.getValue().transportState().transportReportStatus(status, false, new Metadata());
} }
} }
for (OkHttpClientStream stream : pendingStreams) { for (OkHttpClientStream stream : pendingStreams) {
stream.transportReportStatus(status, true, new Metadata()); stream.transportState().transportReportStatus(status, true, new Metadata());
} }
pendingStreams.clear(); pendingStreams.clear();
maybeClearInUse(); maybeClearInUse();
@ -720,8 +718,10 @@ class OkHttpClientTransport implements ConnectionClientTransport {
* @param streamId the Id of the stream. * @param streamId the Id of the stream.
* @param status the final status of this stream, null means no need to report. * @param status the final status of this stream, null means no need to report.
* @param errorCode reset the stream with this ErrorCode if not null. * @param errorCode reset the stream with this ErrorCode if not null.
* @param trailers the trailers received if not null
*/ */
void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode errorCode) { void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode errorCode,
@Nullable Metadata trailers) {
synchronized (lock) { synchronized (lock) {
OkHttpClientStream stream = streams.remove(streamId); OkHttpClientStream stream = streams.remove(streamId);
if (stream != null) { if (stream != null) {
@ -731,7 +731,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
if (status != null) { if (status != null) {
boolean isCancelled = (status.getCode() == Code.CANCELLED boolean isCancelled = (status.getCode() == Code.CANCELLED
|| status.getCode() == Code.DEADLINE_EXCEEDED); || status.getCode() == Code.DEADLINE_EXCEEDED);
stream.transportReportStatus(status, isCancelled, new Metadata()); stream.transportState().transportReportStatus(status, isCancelled,
trailers != null ? trailers : new Metadata());
} }
if (!startPendingStreams()) { if (!startPendingStreams()) {
stopIfNecessary(); stopIfNecessary();
@ -909,7 +910,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
Buffer buf = new Buffer(); Buffer buf = new Buffer();
buf.write(in.buffer(), length); buf.write(in.buffer(), length);
synchronized (lock) { synchronized (lock) {
stream.transportDataReceived(buf, inFinished); stream.transportState().transportDataReceived(buf, inFinished);
} }
} }
@ -941,7 +942,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
unknownStream = true; unknownStream = true;
} }
} else { } else {
stream.transportHeadersReceived(headerBlock, inFinished); stream.transportState().transportHeadersReceived(headerBlock, inFinished);
} }
} }
if (unknownStream) { if (unknownStream) {
@ -952,7 +953,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Override @Override
public void rstStream(int streamId, ErrorCode errorCode) { public void rstStream(int streamId, ErrorCode errorCode) {
finishStream(streamId, toGrpcStatus(errorCode).augmentDescription("Rst Stream"), null); finishStream(streamId, toGrpcStatus(errorCode).augmentDescription("Rst Stream"), null, null);
} }
@Override @Override
@ -1037,7 +1038,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
onError(ErrorCode.PROTOCOL_ERROR, errorMsg); onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
} else { } else {
finishStream(streamId, finishStream(streamId,
Status.INTERNAL.withDescription(errorMsg), ErrorCode.PROTOCOL_ERROR); Status.INTERNAL.withDescription(errorMsg), ErrorCode.PROTOCOL_ERROR, null);
} }
return; return;
} }

View File

@ -396,7 +396,7 @@ class OutboundFlowController {
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
stream.onStreamSentBytes(bytesToWrite); stream.transportState().onSentBytes(bytesToWrite);
if (enqueued) { if (enqueued) {
// It's enqueued - remove it from the head of the pending write queue. // It's enqueued - remove it from the head of the pending write queue.

View File

@ -97,7 +97,7 @@ public class OkHttpClientStreamTest {
} }
@Test @Test
public void sendCancel_notStarted() { public void cancel_notStarted() {
final AtomicReference<Status> statusRef = new AtomicReference<Status>(); final AtomicReference<Status> statusRef = new AtomicReference<Status>();
stream.start(new BaseClientStreamListener() { stream.start(new BaseClientStreamListener() {
@Override @Override
@ -107,34 +107,34 @@ public class OkHttpClientStreamTest {
} }
}); });
stream.sendCancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
assertEquals(Status.Code.CANCELLED, statusRef.get().getCode()); assertEquals(Status.Code.CANCELLED, statusRef.get().getCode());
} }
@Test @Test
public void sendCancel_started() { public void cancel_started() {
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.start(1234); stream.transportState().start(1234);
Mockito.doAnswer(new Answer<Void>() { Mockito.doAnswer(new Answer<Void>() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
assertTrue(Thread.holdsLock(lock)); assertTrue(Thread.holdsLock(lock));
return null; return null;
} }
}).when(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL); }).when(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL, null);
stream.sendCancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
verify(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL); verify(transport).finishStream(1234, Status.CANCELLED, ErrorCode.CANCEL, null);
} }
@Test @Test
public void start_alreadyCancelled() { public void start_alreadyCancelled() {
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.sendCancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
stream.start(1234); stream.transportState().start(1234);
verifyNoMoreInteractions(frameWriter); verifyNoMoreInteractions(frameWriter);
} }
@ -147,7 +147,7 @@ public class OkHttpClientStreamTest {
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP); StatsTraceContext.NOOP);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.start(3); stream.transportState().start(3);
verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture()); verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture());
assertThat(headersCaptor.getValue()) assertThat(headersCaptor.getValue())
@ -162,7 +162,7 @@ public class OkHttpClientStreamTest {
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP); StatsTraceContext.NOOP);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.start(3); stream.transportState().start(3);
verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture()); verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture());
assertThat(headersCaptor.getValue()).containsExactly( assertThat(headersCaptor.getValue()).containsExactly(

View File

@ -827,7 +827,7 @@ public class OkHttpClientTransportTest {
Buffer sentFrame = captor.getValue(); Buffer sentFrame = captor.getValue();
assertEquals(createMessageFrame(sentMessage), sentFrame); assertEquals(createMessageFrame(sentMessage), sentFrame);
verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0)); verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0));
stream2.sendCancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED);
shutdownAndVerify(); shutdownAndVerify();
} }
@ -839,9 +839,9 @@ public class OkHttpClientTransportTest {
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata()); OkHttpClientStream stream = clientTransport.newStream(method, new Metadata());
stream.start(listener); stream.start(listener);
waitForStreamPending(1); waitForStreamPending(1);
stream.sendCancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
// The second cancel should be an no-op. // The second cancel should be an no-op.
stream.sendCancel(Status.UNKNOWN); stream.cancel(Status.UNKNOWN);
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(0, clientTransport.getPendingStreamSize()); assertEquals(0, clientTransport.getPendingStreamSize());
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
@ -872,7 +872,7 @@ public class OkHttpClientTransportTest {
// active stream should not be affected. // active stream should not be affected.
assertEquals(1, activeStreamCount()); assertEquals(1, activeStreamCount());
getStream(3).sendCancel(Status.CANCELLED); getStream(3).cancel(Status.CANCELLED);
shutdownAndVerify(); shutdownAndVerify();
} }
@ -891,7 +891,7 @@ public class OkHttpClientTransportTest {
verify(frameWriter, timeout(TIME_OUT_MS)) verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
assertEquals(1, activeStreamCount()); assertEquals(1, activeStreamCount());
stream.sendCancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
shutdownAndVerify(); shutdownAndVerify();
} }
@ -1013,7 +1013,7 @@ public class OkHttpClientTransportTest {
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
assertEquals(0, listener.messages.size()); assertEquals(0, listener.messages.size());
shutdownAndVerify(); shutdownAndVerify();
} }
@ -1034,7 +1034,7 @@ public class OkHttpClientTransportTest {
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
assertEquals(0, listener.messages.size()); assertEquals(0, listener.messages.size());
shutdownAndVerify(); shutdownAndVerify();
} }
@ -1054,7 +1054,7 @@ public class OkHttpClientTransportTest {
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
assertEquals(0, listener.messages.size()); assertEquals(0, listener.messages.size());
shutdownAndVerify(); shutdownAndVerify();
} }