okhttp: Optimize memory usage by mergeing buffers (#5023)

okhttp: Optimize memory usage by mergeing buffers

OkHttp transport's memory useage by merging Buffers for each pending data.
- OutboundFlowController, OkHttpClientStream

NOTE: Buffer by default allocate 4k memory.
This commit is contained in:
Jihun Cho 2018-11-06 11:01:20 -08:00 committed by GitHub
parent e2e990b01a
commit 80c973cbd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 185 deletions

View File

@ -33,9 +33,7 @@ import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import javax.annotation.concurrent.GuardedBy;
import okio.Buffer;
@ -194,12 +192,10 @@ class OkHttpClientStream extends AbstractClientStream {
private final Object lock;
@GuardedBy("lock")
private List<Header> requestHeaders;
/**
* Null iff {@link #requestHeaders} is null. Non-null iff neither {@link #cancel} nor
* {@link #start(int)} have been called.
*/
@GuardedBy("lock")
private Queue<PendingData> pendingData = new ArrayDeque<PendingData>();
private Buffer pendingData = new Buffer();
private boolean pendingDataHasEndOfStream = false;
private boolean flushPendingData = false;
@GuardedBy("lock")
private boolean cancelSent = false;
@GuardedBy("lock")
@ -212,6 +208,9 @@ class OkHttpClientStream extends AbstractClientStream {
private final OutboundFlowController outboundFlow;
@GuardedBy("lock")
private final OkHttpClientTransport transport;
/** True iff neither {@link #cancel} nor {@link #start(int)} have been called. */
@GuardedBy("lock")
private boolean canStart = true;
public TransportState(
int maxMessageSize,
@ -237,24 +236,16 @@ class OkHttpClientStream extends AbstractClientStream {
id = streamId;
state.onStreamAllocated();
if (pendingData != null) {
if (canStart) {
// Only happens when the stream has neither been started nor cancelled.
frameWriter.synStream(useGet, false, id, 0, requestHeaders);
statsTraceCtx.clientOutboundHeaders();
requestHeaders = null;
boolean flush = false;
while (!pendingData.isEmpty()) {
PendingData data = pendingData.poll();
outboundFlow.data(data.endOfStream, id, data.buffer, false);
if (data.flush) {
flush = true;
}
if (pendingData.size() > 0) {
outboundFlow.data(pendingDataHasEndOfStream, id, pendingData, flushPendingData);
}
if (flush) {
outboundFlow.flush();
}
pendingData = null;
canStart = false;
}
}
@ -354,15 +345,13 @@ class OkHttpClientStream extends AbstractClientStream {
return;
}
cancelSent = true;
if (pendingData != null) {
if (canStart) {
// stream is pending.
transport.removePendingStream(OkHttpClientStream.this);
// release holding data, so they can be GCed or returned to pool earlier.
requestHeaders = null;
for (PendingData data : pendingData) {
data.buffer.clear();
}
pendingData = null;
pendingData.clear();
canStart = false;
transportReportStatus(reason, true, trailers != null ? trailers : new Metadata());
} else {
// If pendingData is null, start must have already been called, which means synStream has
@ -377,9 +366,12 @@ class OkHttpClientStream extends AbstractClientStream {
if (cancelSent) {
return;
}
if (pendingData != null) {
if (canStart) {
// Stream is pending start, queue the data.
pendingData.add(new PendingData(buffer, endOfStream, flush));
int dataSize = (int) buffer.size();
pendingData.write(buffer, dataSize);
pendingDataHasEndOfStream |= endOfStream;
flushPendingData |= flush;
} else {
checkState(id() != ABSENT_ID, "streamId should be set");
// If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
@ -402,16 +394,4 @@ class OkHttpClientStream extends AbstractClientStream {
Object getOutboundFlowState() {
return outboundFlowState;
}
private static class PendingData {
Buffer buffer;
boolean endOfStream;
boolean flush;
PendingData(Buffer buffer, boolean endOfStream, boolean flush) {
this.buffer = buffer;
this.endOfStream = endOfStream;
this.flush = flush;
}
}
}

View File

@ -24,8 +24,6 @@ import static java.lang.Math.min;
import com.google.common.base.Preconditions;
import io.grpc.okhttp.internal.framed.FrameWriter;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import javax.annotation.Nullable;
import okio.Buffer;
@ -119,31 +117,21 @@ class OutboundFlowController {
OutboundFlowState state = state(stream);
int window = state.writableWindow();
boolean framesAlreadyQueued = state.hasFrame();
boolean framesAlreadyQueued = state.hasPendingData();
int size = (int) source.size();
OutboundFlowState.Frame frame = state.newFrame(source, outFinished);
if (!framesAlreadyQueued && window >= frame.size()) {
if (!framesAlreadyQueued && window >= size) {
// Window size is large enough to send entire data frame
frame.write();
if (flush) {
flush();
state.write(source, size, outFinished);
} else {
// send partial data
if (!framesAlreadyQueued && window > 0) {
state.write(source, window, false);
}
return;
// Queue remaining data in the buffer
state.enqueue(source, (int) source.size(), outFinished);
}
// Enqueue the frame to be written when the window size permits.
frame.enqueue();
if (framesAlreadyQueued || window <= 0) {
// Stream already has frames pending or is stalled, don't send anything now.
if (flush) {
flush();
}
return;
}
// Create and send a partial frame up to the window size.
frame.split(window).write();
if (flush) {
flush();
}
@ -228,17 +216,17 @@ class OutboundFlowController {
* The outbound flow control state for a single stream.
*/
private final class OutboundFlowState {
final Queue<Frame> pendingWriteQueue;
final Buffer pendingWriteBuffer;
final int streamId;
int queuedBytes;
int window;
int allocatedBytes;
OkHttpClientStream stream;
boolean pendingBufferHasEndOfStream = false;
OutboundFlowState(int streamId, int initialWindowSize) {
this.streamId = streamId;
window = initialWindowSize;
pendingWriteQueue = new ArrayDeque<>(2);
pendingWriteBuffer = new Buffer();
}
OutboundFlowState(OkHttpClientStream stream, int initialWindowSize) {
@ -287,28 +275,14 @@ class OutboundFlowController {
}
int streamableBytes() {
return max(0, min(window, queuedBytes));
}
/**
* Creates a new frame with the given values but does not add it to the pending queue.
*/
Frame newFrame(Buffer data, boolean endStream) {
return new Frame(data, endStream);
return max(0, min(window, (int) pendingWriteBuffer.size()));
}
/**
* Indicates whether or not there are frames in the pending queue.
*/
boolean hasFrame() {
return !pendingWriteQueue.isEmpty();
}
/**
* Returns the head of the pending queue, or {@code null} if empty.
*/
private Frame peek() {
return pendingWriteQueue.peek();
boolean hasPendingData() {
return pendingWriteBuffer.size() > 0;
}
/**
@ -317,26 +291,16 @@ class OutboundFlowController {
int writeBytes(int bytes, WriteStatus writeStatus) {
int bytesAttempted = 0;
int maxBytes = min(bytes, writableWindow());
while (hasFrame()) {
Frame pendingWrite = peek();
if (maxBytes >= pendingWrite.size()) {
while (hasPendingData() && maxBytes > 0) {
if (maxBytes >= pendingWriteBuffer.size()) {
// Window size is large enough to send entire data frame
writeStatus.incrementNumWrites();
bytesAttempted += pendingWrite.size();
pendingWrite.write();
} else if (maxBytes <= 0) {
// No data from the current frame can be written - we're done.
// We purposely check this after first testing the size of the
// pending frame to properly handle zero-length frame.
break;
bytesAttempted += (int) pendingWriteBuffer.size();
write(pendingWriteBuffer, (int) pendingWriteBuffer.size(), pendingBufferHasEndOfStream);
} else {
// We can send a partial frame
Frame partialFrame = pendingWrite.split(maxBytes);
writeStatus.incrementNumWrites();
bytesAttempted += partialFrame.size();
partialFrame.write();
bytesAttempted += maxBytes;
write(pendingWriteBuffer, maxBytes, false);
}
writeStatus.incrementNumWrites();
// Update the threshold.
maxBytes = min(bytes - bytesAttempted, writableWindow());
}
@ -344,95 +308,38 @@ class OutboundFlowController {
}
/**
* A wrapper class around the content of a data frame.
* Writes the frame and decrements the stream and connection window sizes. If the frame is in
* the pending queue, the written bytes are removed from this branch of the priority tree. If
* the window size is smaller than the frame, it sends partial frame.
*/
private final class Frame {
final Buffer data;
final boolean endStream;
boolean enqueued;
void write(Buffer buffer, int bytesToSend, boolean endOfStream) {
int bytesToWrite = bytesToSend;
// Using a do/while loop because if the buffer is empty we still need to call
// the writer once to send the empty frame.
do {
int frameBytes = min(bytesToWrite, frameWriter.maxDataLength());
connectionState.incrementStreamWindow(-frameBytes);
incrementStreamWindow(-frameBytes);
try {
// endOfStream is set for the last chunk of data marked as endOfStream
boolean isEndOfStream = buffer.size() == frameBytes && endOfStream;
// AsyncFrameWriter drains buffer in executor. To avoid race, copy to temp.
// TODO(jihuncho) remove temp buff when async logic is moved to AsyncSink.
Buffer temp = new Buffer();
temp.write(buffer, frameBytes);
Frame(Buffer data, boolean endStream) {
this.data = data;
this.endStream = endStream;
}
/**
* Gets the total size (in bytes) of this frame including the data and padding.
*/
int size() {
return (int) data.size();
}
void enqueue() {
if (!enqueued) {
enqueued = true;
pendingWriteQueue.offer(this);
// Increment the number of pending bytes for this stream.
queuedBytes += size();
frameWriter.data(isEndOfStream, streamId, temp, frameBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
stream.transportState().onSentBytes(frameBytes);
bytesToWrite -= frameBytes;
} while (bytesToWrite > 0);
}
/**
* Writes the frame and decrements the stream and connection window sizes. If the frame is in
* the pending queue, the written bytes are removed from this branch of the priority tree.
*/
void write() {
// Using a do/while loop because if the buffer is empty we still need to call
// the writer once to send the empty frame.
do {
int bytesToWrite = size();
int frameBytes = min(bytesToWrite, frameWriter.maxDataLength());
if (frameBytes == bytesToWrite) {
// All the bytes fit into a single HTTP/2 frame, just send it all.
connectionState.incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite);
try {
frameWriter.data(endStream, streamId, data, bytesToWrite);
} catch (IOException e) {
throw new RuntimeException(e);
}
stream.transportState().onSentBytes(bytesToWrite);
if (enqueued) {
// It's enqueued - remove it from the head of the pending write queue.
queuedBytes -= bytesToWrite;
pendingWriteQueue.remove(this);
}
return;
}
// Split a chunk that will fit into a single HTTP/2 frame and write it.
Frame frame = split(frameBytes);
frame.write();
} while (size() > 0);
}
/**
* Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first
* split from the data buffer. If not all the requested bytes are available, the remaining
* bytes are then split from the padding (if available).
*
* @param maxBytes the maximum number of bytes that is allowed in the created frame.
* @return the partial frame.
*/
Frame split(int maxBytes) {
// The requested maxBytes should always be less than the size of this frame.
assert maxBytes < size() : "Attempting to split a frame for the full size.";
// Get the portion of the data buffer to be split. Limit to the readable bytes.
int dataSplit = min(maxBytes, (int) data.size());
Buffer splitSlice = new Buffer();
splitSlice.write(data, dataSplit);
Frame frame = new Frame(splitSlice, false);
if (enqueued) {
queuedBytes -= dataSplit;
}
return frame;
}
void enqueue(Buffer buffer, int size, boolean endOfStream) {
this.pendingWriteBuffer.write(buffer, size);
this.pendingBufferHasEndOfStream |= endOfStream;
}
}
}
}

View File

@ -978,10 +978,9 @@ public class OkHttpClientTransportTest {
assertEquals(0, clientTransport.getPendingStreamSize());
ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(5), captor.capture(), eq(5 + HEADER_LENGTH));
.data(eq(true), eq(5), captor.capture(), eq(5 + HEADER_LENGTH));
Buffer sentFrame = captor.getValue();
assertEquals(createMessageFrame(sentMessage), sentFrame);
verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0));
stream2.cancel(Status.CANCELLED);
shutdownAndVerify();
}