okhttp: Limit number of outstanding client-induced control frames

This commit is contained in:
Eric Anderson 2022-06-12 15:11:51 -07:00
parent e96d04774b
commit bc50adf4b4
5 changed files with 200 additions and 14 deletions

View File

@ -21,6 +21,9 @@ import static com.google.common.base.Preconditions.checkState;
import io.grpc.internal.SerializingExecutor;
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.FrameWriter;
import io.grpc.okhttp.internal.framed.Settings;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import java.io.IOException;
@ -42,6 +45,7 @@ final class AsyncSink implements Sink {
private final Buffer buffer = new Buffer();
private final SerializingExecutor serializingExecutor;
private final TransportExceptionHandler transportExceptionHandler;
private final int maxQueuedControlFrames;
@GuardedBy("lock")
private boolean writeEnqueued = false;
@ -52,15 +56,26 @@ final class AsyncSink implements Sink {
private Sink sink;
@Nullable
private Socket socket;
private boolean controlFramesExceeded;
private int controlFramesInWrite;
@GuardedBy("lock")
private int queuedControlFrames;
private AsyncSink(SerializingExecutor executor, TransportExceptionHandler exceptionHandler) {
private AsyncSink(SerializingExecutor executor, TransportExceptionHandler exceptionHandler,
int maxQueuedControlFrames) {
this.serializingExecutor = checkNotNull(executor, "executor");
this.transportExceptionHandler = checkNotNull(exceptionHandler, "exceptionHandler");
this.maxQueuedControlFrames = maxQueuedControlFrames;
}
/**
* {@code maxQueuedControlFrames} is only effective for frames written with
* {@link #limitControlFramesWriter(FrameWriter)}.
*/
static AsyncSink sink(
SerializingExecutor executor, TransportExceptionHandler exceptionHandler) {
return new AsyncSink(executor, exceptionHandler);
SerializingExecutor executor, TransportExceptionHandler exceptionHandler,
int maxQueuedControlFrames) {
return new AsyncSink(executor, exceptionHandler, maxQueuedControlFrames);
}
/**
@ -75,6 +90,10 @@ final class AsyncSink implements Sink {
this.socket = checkNotNull(socket, "socket");
}
FrameWriter limitControlFramesWriter(FrameWriter delegate) {
return new LimitControlFramesWriter(delegate);
}
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkNotNull(source, "source");
@ -83,12 +102,29 @@ final class AsyncSink implements Sink {
}
PerfMark.startTask("AsyncSink.write");
try {
boolean closeSocket = false;
synchronized (lock) {
buffer.write(source, byteCount);
if (writeEnqueued || flushEnqueued || buffer.completeSegmentByteCount() <= 0) {
return;
queuedControlFrames += controlFramesInWrite;
controlFramesInWrite = 0;
if (!controlFramesExceeded && queuedControlFrames > maxQueuedControlFrames) {
controlFramesExceeded = true;
closeSocket = true;
} else {
if (writeEnqueued || flushEnqueued || buffer.completeSegmentByteCount() <= 0) {
return;
}
writeEnqueued = true;
}
writeEnqueued = true;
}
if (closeSocket) {
try {
socket.close();
} catch (IOException e) {
transportExceptionHandler.onException(e);
}
return;
}
serializingExecutor.execute(new WriteRunnable() {
final Link link = PerfMark.linkOut();
@ -98,11 +134,18 @@ final class AsyncSink implements Sink {
PerfMark.linkIn(link);
Buffer buf = new Buffer();
try {
int writingControlFrames;
synchronized (lock) {
buf.write(buffer, buffer.completeSegmentByteCount());
writeEnqueued = false;
// Imprecise because we only tranfer complete segments, but not by much and error
// won't accumulate over time
writingControlFrames = queuedControlFrames;
}
sink.write(buf, buf.size());
synchronized (lock) {
queuedControlFrames -= writingControlFrames;
}
} finally {
PerfMark.stopTask("WriteRunnable.runWrite");
}
@ -205,4 +248,30 @@ final class AsyncSink implements Sink {
public abstract void doRun() throws IOException;
}
private class LimitControlFramesWriter extends ForwardingFrameWriter {
public LimitControlFramesWriter(FrameWriter delegate) {
super(delegate);
}
@Override
public void ackSettings(Settings peerSettings) throws IOException {
controlFramesInWrite++;
super.ackSettings(peerSettings);
}
@Override
public void rstStream(int streamId, ErrorCode errorCode) throws IOException {
controlFramesInWrite++;
super.rstStream(streamId, errorCode);
}
@Override
public void ping(boolean ack, int payload1, int payload2) throws IOException {
if (ack) {
controlFramesInWrite++;
}
super.ping(ack, payload1, payload2);
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.okhttp;
import com.google.common.base.Preconditions;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.FrameWriter;
import io.grpc.okhttp.internal.framed.Header;
import io.grpc.okhttp.internal.framed.Settings;
import java.io.IOException;
import java.util.List;
import okio.Buffer;
/** FrameWriter that forwards all calls to a delegate. */
abstract class ForwardingFrameWriter implements FrameWriter {
private final FrameWriter delegate;
public ForwardingFrameWriter(FrameWriter delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "delegate");
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public void connectionPreface() throws IOException {
delegate.connectionPreface();
}
@Override
public void ackSettings(Settings peerSettings) throws IOException {
delegate.ackSettings(peerSettings);
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException {
delegate.pushPromise(streamId, promisedStreamId, requestHeaders);
}
@Override
public void flush() throws IOException {
delegate.flush();
}
@Override
public void synStream(boolean outFinished, boolean inFinished, int streamId,
int associatedStreamId, List<Header> headerBlock) throws IOException {
delegate.synStream(outFinished, inFinished, streamId, associatedStreamId, headerBlock);
}
@Override
public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
throws IOException {
delegate.synReply(outFinished, streamId, headerBlock);
}
@Override
public void headers(int streamId, List<Header> headerBlock) throws IOException {
delegate.headers(streamId, headerBlock);
}
@Override
public void rstStream(int streamId, ErrorCode errorCode) throws IOException {
delegate.rstStream(streamId, errorCode);
}
@Override
public int maxDataLength() {
return delegate.maxDataLength();
}
@Override
public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
throws IOException {
delegate.data(outFinished, streamId, source, byteCount);
}
@Override
public void settings(Settings okHttpSettings) throws IOException {
delegate.settings(okHttpSettings);
}
@Override
public void ping(boolean ack, int payload1, int payload2) throws IOException {
delegate.ping(ack, payload1, payload2);
}
@Override
public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData)
throws IOException {
delegate.goAway(lastGoodStreamId, errorCode, debugData);
}
@Override
public void windowUpdate(int streamId, long windowSizeIncrement) throws IOException {
delegate.windowUpdate(streamId, windowSizeIncrement);
}
}

View File

@ -480,8 +480,10 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
keepAliveManager.onTransportStarted();
}
final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this);
FrameWriter rawFrameWriter = variant.newWriter(Okio.buffer(asyncSink), true);
int maxQueuedControlFrames = 10000;
final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
variant.newWriter(Okio.buffer(asyncSink), true));
synchronized (lock) {
// Handle FrameWriter exceptions centrally, since there are many callers. Note that errors

View File

@ -149,9 +149,11 @@ final class OkHttpServerTransport implements ServerTransport,
Socket socket = result.socket;
this.attributes = result.attributes;
AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this);
int maxQueuedControlFrames = 10000;
AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
asyncSink.becomeConnected(Okio.sink(socket), socket);
FrameWriter rawFrameWriter = variant.newWriter(Okio.buffer(asyncSink), false);
FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
variant.newWriter(Okio.buffer(asyncSink), false));
synchronized (lock) {
this.securityInfo = result.securityInfo;
@ -809,7 +811,6 @@ final class OkHttpServerTransport implements ServerTransport,
// The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
// writes due to update in settings must be sent after SETTINGS acknowledgment frame,
// otherwise it will cause a stream error (RST_STREAM).
// FIXME: limit number of queued control frames
frameWriter.ackSettings(settings);
frameWriter.flush();
if (!receivedSettings) {
@ -830,7 +831,6 @@ final class OkHttpServerTransport implements ServerTransport,
if (!ack) {
frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
synchronized (lock) {
// FIXME: limit number of queued control frames
frameWriter.ping(true, payload1, payload2);
frameWriter.flush();
}
@ -927,7 +927,6 @@ final class OkHttpServerTransport implements ServerTransport,
Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
}
synchronized (lock) {
// FIXME: limit number of queued control frames
frameWriter.rstStream(streamId, errorCode);
frameWriter.flush();
StreamState stream = streams.get(streamId);

View File

@ -56,7 +56,7 @@ public class AsyncSinkTest {
private final QueueingExecutor queueingExecutor = new QueueingExecutor();
private final TransportExceptionHandler exceptionHandler = mock(TransportExceptionHandler.class);
private final AsyncSink sink =
AsyncSink.sink(new SerializingExecutor(queueingExecutor), exceptionHandler);
AsyncSink.sink(new SerializingExecutor(queueingExecutor), exceptionHandler, 10000);
@Test
public void noCoalesceRequired() throws IOException {