diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 29f24ce330..782cbbc1db 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -33,9 +33,7 @@ import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.Header; -import java.util.ArrayDeque; import java.util.List; -import java.util.Queue; import javax.annotation.concurrent.GuardedBy; import okio.Buffer; @@ -194,12 +192,10 @@ class OkHttpClientStream extends AbstractClientStream { private final Object lock; @GuardedBy("lock") private List
requestHeaders; - /** - * Null iff {@link #requestHeaders} is null. Non-null iff neither {@link #cancel} nor - * {@link #start(int)} have been called. - */ @GuardedBy("lock") - private Queue pendingData = new ArrayDeque(); + private Buffer pendingData = new Buffer(); + private boolean pendingDataHasEndOfStream = false; + private boolean flushPendingData = false; @GuardedBy("lock") private boolean cancelSent = false; @GuardedBy("lock") @@ -212,6 +208,9 @@ class OkHttpClientStream extends AbstractClientStream { private final OutboundFlowController outboundFlow; @GuardedBy("lock") private final OkHttpClientTransport transport; + /** True iff neither {@link #cancel} nor {@link #start(int)} have been called. */ + @GuardedBy("lock") + private boolean canStart = true; public TransportState( int maxMessageSize, @@ -237,24 +236,16 @@ class OkHttpClientStream extends AbstractClientStream { id = streamId; state.onStreamAllocated(); - if (pendingData != null) { + if (canStart) { // Only happens when the stream has neither been started nor cancelled. frameWriter.synStream(useGet, false, id, 0, requestHeaders); statsTraceCtx.clientOutboundHeaders(); requestHeaders = null; - boolean flush = false; - while (!pendingData.isEmpty()) { - PendingData data = pendingData.poll(); - outboundFlow.data(data.endOfStream, id, data.buffer, false); - if (data.flush) { - flush = true; - } + if (pendingData.size() > 0) { + outboundFlow.data(pendingDataHasEndOfStream, id, pendingData, flushPendingData); } - if (flush) { - outboundFlow.flush(); - } - pendingData = null; + canStart = false; } } @@ -354,15 +345,13 @@ class OkHttpClientStream extends AbstractClientStream { return; } cancelSent = true; - if (pendingData != null) { + if (canStart) { // stream is pending. transport.removePendingStream(OkHttpClientStream.this); // release holding data, so they can be GCed or returned to pool earlier. requestHeaders = null; - for (PendingData data : pendingData) { - data.buffer.clear(); - } - pendingData = null; + pendingData.clear(); + canStart = false; transportReportStatus(reason, true, trailers != null ? trailers : new Metadata()); } else { // If pendingData is null, start must have already been called, which means synStream has @@ -377,9 +366,12 @@ class OkHttpClientStream extends AbstractClientStream { if (cancelSent) { return; } - if (pendingData != null) { + if (canStart) { // Stream is pending start, queue the data. - pendingData.add(new PendingData(buffer, endOfStream, flush)); + int dataSize = (int) buffer.size(); + pendingData.write(buffer, dataSize); + pendingDataHasEndOfStream |= endOfStream; + flushPendingData |= flush; } else { checkState(id() != ABSENT_ID, "streamId should be set"); // If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is @@ -402,16 +394,4 @@ class OkHttpClientStream extends AbstractClientStream { Object getOutboundFlowState() { return outboundFlowState; } - - private static class PendingData { - Buffer buffer; - boolean endOfStream; - boolean flush; - - PendingData(Buffer buffer, boolean endOfStream, boolean flush) { - this.buffer = buffer; - this.endOfStream = endOfStream; - this.flush = flush; - } - } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java index d51e831477..e3cad95313 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java @@ -24,8 +24,6 @@ import static java.lang.Math.min; import com.google.common.base.Preconditions; import io.grpc.okhttp.internal.framed.FrameWriter; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; import javax.annotation.Nullable; import okio.Buffer; @@ -119,31 +117,21 @@ class OutboundFlowController { OutboundFlowState state = state(stream); int window = state.writableWindow(); - boolean framesAlreadyQueued = state.hasFrame(); + boolean framesAlreadyQueued = state.hasPendingData(); + int size = (int) source.size(); - OutboundFlowState.Frame frame = state.newFrame(source, outFinished); - if (!framesAlreadyQueued && window >= frame.size()) { + if (!framesAlreadyQueued && window >= size) { // Window size is large enough to send entire data frame - frame.write(); - if (flush) { - flush(); + state.write(source, size, outFinished); + } else { + // send partial data + if (!framesAlreadyQueued && window > 0) { + state.write(source, window, false); } - return; + // Queue remaining data in the buffer + state.enqueue(source, (int) source.size(), outFinished); } - // Enqueue the frame to be written when the window size permits. - frame.enqueue(); - - if (framesAlreadyQueued || window <= 0) { - // Stream already has frames pending or is stalled, don't send anything now. - if (flush) { - flush(); - } - return; - } - - // Create and send a partial frame up to the window size. - frame.split(window).write(); if (flush) { flush(); } @@ -228,17 +216,17 @@ class OutboundFlowController { * The outbound flow control state for a single stream. */ private final class OutboundFlowState { - final Queue pendingWriteQueue; + final Buffer pendingWriteBuffer; final int streamId; - int queuedBytes; int window; int allocatedBytes; OkHttpClientStream stream; + boolean pendingBufferHasEndOfStream = false; OutboundFlowState(int streamId, int initialWindowSize) { this.streamId = streamId; window = initialWindowSize; - pendingWriteQueue = new ArrayDeque<>(2); + pendingWriteBuffer = new Buffer(); } OutboundFlowState(OkHttpClientStream stream, int initialWindowSize) { @@ -287,28 +275,14 @@ class OutboundFlowController { } int streamableBytes() { - return max(0, min(window, queuedBytes)); - } - - /** - * Creates a new frame with the given values but does not add it to the pending queue. - */ - Frame newFrame(Buffer data, boolean endStream) { - return new Frame(data, endStream); + return max(0, min(window, (int) pendingWriteBuffer.size())); } /** * Indicates whether or not there are frames in the pending queue. */ - boolean hasFrame() { - return !pendingWriteQueue.isEmpty(); - } - - /** - * Returns the head of the pending queue, or {@code null} if empty. - */ - private Frame peek() { - return pendingWriteQueue.peek(); + boolean hasPendingData() { + return pendingWriteBuffer.size() > 0; } /** @@ -317,26 +291,16 @@ class OutboundFlowController { int writeBytes(int bytes, WriteStatus writeStatus) { int bytesAttempted = 0; int maxBytes = min(bytes, writableWindow()); - while (hasFrame()) { - Frame pendingWrite = peek(); - if (maxBytes >= pendingWrite.size()) { + while (hasPendingData() && maxBytes > 0) { + if (maxBytes >= pendingWriteBuffer.size()) { // Window size is large enough to send entire data frame - writeStatus.incrementNumWrites(); - bytesAttempted += pendingWrite.size(); - pendingWrite.write(); - } else if (maxBytes <= 0) { - // No data from the current frame can be written - we're done. - // We purposely check this after first testing the size of the - // pending frame to properly handle zero-length frame. - break; + bytesAttempted += (int) pendingWriteBuffer.size(); + write(pendingWriteBuffer, (int) pendingWriteBuffer.size(), pendingBufferHasEndOfStream); } else { - // We can send a partial frame - Frame partialFrame = pendingWrite.split(maxBytes); - writeStatus.incrementNumWrites(); - bytesAttempted += partialFrame.size(); - partialFrame.write(); + bytesAttempted += maxBytes; + write(pendingWriteBuffer, maxBytes, false); } - + writeStatus.incrementNumWrites(); // Update the threshold. maxBytes = min(bytes - bytesAttempted, writableWindow()); } @@ -344,95 +308,38 @@ class OutboundFlowController { } /** - * A wrapper class around the content of a data frame. + * Writes the frame and decrements the stream and connection window sizes. If the frame is in + * the pending queue, the written bytes are removed from this branch of the priority tree. If + * the window size is smaller than the frame, it sends partial frame. */ - private final class Frame { - final Buffer data; - final boolean endStream; - boolean enqueued; + void write(Buffer buffer, int bytesToSend, boolean endOfStream) { + int bytesToWrite = bytesToSend; + // Using a do/while loop because if the buffer is empty we still need to call + // the writer once to send the empty frame. + do { + int frameBytes = min(bytesToWrite, frameWriter.maxDataLength()); + connectionState.incrementStreamWindow(-frameBytes); + incrementStreamWindow(-frameBytes); + try { + // endOfStream is set for the last chunk of data marked as endOfStream + boolean isEndOfStream = buffer.size() == frameBytes && endOfStream; + // AsyncFrameWriter drains buffer in executor. To avoid race, copy to temp. + // TODO(jihuncho) remove temp buff when async logic is moved to AsyncSink. + Buffer temp = new Buffer(); + temp.write(buffer, frameBytes); - Frame(Buffer data, boolean endStream) { - this.data = data; - this.endStream = endStream; - } - - /** - * Gets the total size (in bytes) of this frame including the data and padding. - */ - int size() { - return (int) data.size(); - } - - void enqueue() { - if (!enqueued) { - enqueued = true; - pendingWriteQueue.offer(this); - - // Increment the number of pending bytes for this stream. - queuedBytes += size(); + frameWriter.data(isEndOfStream, streamId, temp, frameBytes); + } catch (IOException e) { + throw new RuntimeException(e); } - } + stream.transportState().onSentBytes(frameBytes); + bytesToWrite -= frameBytes; + } while (bytesToWrite > 0); + } - /** - * Writes the frame and decrements the stream and connection window sizes. If the frame is in - * the pending queue, the written bytes are removed from this branch of the priority tree. - */ - void write() { - // Using a do/while loop because if the buffer is empty we still need to call - // the writer once to send the empty frame. - do { - int bytesToWrite = size(); - int frameBytes = min(bytesToWrite, frameWriter.maxDataLength()); - if (frameBytes == bytesToWrite) { - // All the bytes fit into a single HTTP/2 frame, just send it all. - connectionState.incrementStreamWindow(-bytesToWrite); - incrementStreamWindow(-bytesToWrite); - try { - frameWriter.data(endStream, streamId, data, bytesToWrite); - } catch (IOException e) { - throw new RuntimeException(e); - } - stream.transportState().onSentBytes(bytesToWrite); - - if (enqueued) { - // It's enqueued - remove it from the head of the pending write queue. - queuedBytes -= bytesToWrite; - pendingWriteQueue.remove(this); - } - return; - } - - // Split a chunk that will fit into a single HTTP/2 frame and write it. - Frame frame = split(frameBytes); - frame.write(); - } while (size() > 0); - } - - /** - * Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first - * split from the data buffer. If not all the requested bytes are available, the remaining - * bytes are then split from the padding (if available). - * - * @param maxBytes the maximum number of bytes that is allowed in the created frame. - * @return the partial frame. - */ - Frame split(int maxBytes) { - // The requested maxBytes should always be less than the size of this frame. - assert maxBytes < size() : "Attempting to split a frame for the full size."; - - // Get the portion of the data buffer to be split. Limit to the readable bytes. - int dataSplit = min(maxBytes, (int) data.size()); - - Buffer splitSlice = new Buffer(); - splitSlice.write(data, dataSplit); - - Frame frame = new Frame(splitSlice, false); - - if (enqueued) { - queuedBytes -= dataSplit; - } - return frame; - } + void enqueue(Buffer buffer, int size, boolean endOfStream) { + this.pendingWriteBuffer.write(buffer, size); + this.pendingBufferHasEndOfStream |= endOfStream; } } -} +} \ No newline at end of file diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index c2a7b60339..69c28beb74 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -978,10 +978,9 @@ public class OkHttpClientTransportTest { assertEquals(0, clientTransport.getPendingStreamSize()); ArgumentCaptor captor = ArgumentCaptor.forClass(Buffer.class); verify(frameWriter, timeout(TIME_OUT_MS)) - .data(eq(false), eq(5), captor.capture(), eq(5 + HEADER_LENGTH)); + .data(eq(true), eq(5), captor.capture(), eq(5 + HEADER_LENGTH)); Buffer sentFrame = captor.getValue(); assertEquals(createMessageFrame(sentMessage), sentFrame); - verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0)); stream2.cancel(Status.CANCELLED); shutdownAndVerify(); }