/* * Copyright 2014 The Netty Project * * The Netty Project licenses this file to you under the Apache License, version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. */ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static java.lang.Math.max; import static java.lang.Math.min; import io.netty.channel.ChannelHandlerContext; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import io.prometheus.client.Histogram; import java.util.ArrayDeque; import java.util.Deque; /** * Basic implementation of {@link Http2RemoteFlowController}. * *
This class is NOT thread safe. The assumption is all methods must be invoked * from a single thread. Typically this thread is the event loop thread for the {@link * ChannelHandlerContext} managed by this class. */ @UnstableApi public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class); private static final int MIN_WRITABLE_CHUNK = 32 * 1024; private final Http2Connection connection; private final Http2Connection.PropertyKey stateKey; private final StreamByteDistributor streamByteDistributor; private final FlowState connectionState; private int initialWindowSize = DEFAULT_WINDOW_SIZE; private WritabilityMonitor monitor; private ChannelHandlerContext ctx; public static final Histogram byteDistributedDuration = Histogram.build() .name("http2_byte_distributed_duration_seconds") .help("The duration of byte distributed to streams.") .register(); public DefaultHttp2RemoteFlowController(Http2Connection connection) { this(connection, (Listener) null); } public DefaultHttp2RemoteFlowController( Http2Connection connection, StreamByteDistributor streamByteDistributor) { this(connection, streamByteDistributor, null); } public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) { this(connection, new WeightedFairQueueByteDistributor(connection), listener); } public DefaultHttp2RemoteFlowController( Http2Connection connection, StreamByteDistributor streamByteDistributor, final Listener listener) { this.connection = checkNotNull(connection, "connection"); this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor"); // Add a flow state for the connection. stateKey = connection.newKey(); connectionState = new FlowState(connection.connectionStream()); connection.connectionStream().setProperty(stateKey, connectionState); // Monitor may depend upon connectionState, and so initialize after connectionState listener(listener); monitor.windowSize(connectionState, initialWindowSize); // Register for notification of new streams. connection.addListener( new Http2ConnectionAdapter() { @Override public void onStreamAdded(Http2Stream stream) { // If the stream state is not open then the stream is not yet eligible for flow // controlled frames and // only requires the ReducedFlowState. Otherwise the full amount of memory is required. stream.setProperty(stateKey, new FlowState(stream)); } @Override public void onStreamActive(Http2Stream stream) { // If the object was previously created, but later activated then we have to ensure the // proper // initialWindowSize is used. monitor.windowSize(state(stream), initialWindowSize); } @Override public void onStreamClosed(Http2Stream stream) { // Any pending frames can never be written, cancel and // write errors for any pending frames. state(stream).cancel(STREAM_CLOSED, null); } @Override public void onStreamHalfClosed(Http2Stream stream) { if (HALF_CLOSED_LOCAL == stream.state()) { /** * When this method is called there should not be any pending frames left if the API * is used correctly. However, it is possible that a erroneous application can sneak * in a frame even after having already written a frame with the END_STREAM flag set, * as the stream state might not transition immediately to HALF_CLOSED_LOCAL / CLOSED * due to flow control delaying the write. * *
This is to cancel any such illegal writes. */ state(stream).cancel(STREAM_CLOSED, null); } } }); } /** * {@inheritDoc} * *
Any queued {@link FlowControlled} objects will be sent.
*/
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
this.ctx = checkNotNull(ctx, "ctx");
// Writing the pending bytes will not check writability change and instead a writability change
// notification
// to be provided by an explicit call.
channelWritabilityChanged();
// Don't worry about cleaning up queued frames here if ctx is null. It is expected that all
// streams will be
// closed and the queue cleanup will occur when the stream state transitions occur.
// If any frames have been queued up, we should send them now that we have a channel context.
if (isChannelWritable()) {
writePendingBytes();
}
}
@Override
public ChannelHandlerContext channelHandlerContext() {
return ctx;
}
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
monitor.initialWindowSize(newWindowSize);
}
@Override
public int initialWindowSize() {
return initialWindowSize;
}
@Override
public int windowSize(Http2Stream stream) {
return state(stream).windowSize();
}
@Override
public boolean isWritable(Http2Stream stream) {
return monitor.isWritable(state(stream));
}
@Override
public void channelWritabilityChanged() throws Http2Exception {
monitor.channelWritabilityChange();
}
@Override
public void updateDependencyTree(
int childStreamId, int parentStreamId, short weight, boolean exclusive) {
// It is assumed there are all validated at a higher level. For example in the Http2FrameReader.
assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
assert childStreamId != parentStreamId : "A stream cannot depend on itself";
assert childStreamId > 0 && parentStreamId >= 0
: "childStreamId must be > 0. parentStreamId must be >= 0.";
streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
}
private boolean isChannelWritable() {
return ctx != null && isChannelWritable0();
}
private boolean isChannelWritable0() {
return ctx.channel().isWritable();
}
@Override
public void listener(Listener listener) {
monitor =
listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
}
@Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
monitor.incrementWindowSize(state(stream), delta);
}
@Override
public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
// The context can be null assuming the frame will be queued and send later when the context is
// set.
assert ctx == null || ctx.executor().inEventLoop();
checkNotNull(frame, "frame");
try {
monitor.enqueueFrame(state(stream), frame);
} catch (Throwable t) {
frame.error(ctx, t);
}
}
@Override
public boolean hasFlowControlled(Http2Stream stream) {
return state(stream).hasFrame();
}
private FlowState state(Http2Stream stream) {
return (FlowState) stream.getProperty(stateKey);
}
/** Returns the flow control window for the entire connection. */
private int connectionWindowSize() {
return connectionState.windowSize();
}
private int minUsableChannelBytes() {
// The current allocation algorithm values "fairness" and doesn't give any consideration to
// "goodput". It
// is possible that 1 byte will be allocated to many streams. In an effort to try to make
// "goodput"
// reasonable with the current allocation algorithm we have this "cheap" check up front to
// ensure there is
// an "adequate" amount of connection window before allocation is attempted. This is not
// foolproof as if the
// number of streams is >= this minimal number then we may still have the issue, but the idea is
// to narrow the
// circumstances in which this can happen without rewriting the allocation algorithm.
return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
}
private int maxUsableChannelBytes() {
// If the channel isWritable, allow at least minUsableChannelBytes.
int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
int usableBytes =
channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
// Clip the usable bytes by the connection window.
return min(connectionState.windowSize(), usableBytes);
}
/**
* The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel}
* without queuing "too-much".
*/
private int writableBytes() {
return min(connectionWindowSize(), maxUsableChannelBytes());
}
@Override
public void writePendingBytes() throws Http2Exception {
monitor.writePendingBytes();
}
/** The remote flow control state for a single stream. */
private final class FlowState implements StreamByteDistributor.StreamState {
private final Http2Stream stream;
private final Deque
* Connection Window - Total Queued Bytes > 0 &&
* Stream Window - Bytes Queued for Stream > 0 &&
* isChannelWritable()
*
*/
private final class ListenerWritabilityMonitor extends WritabilityMonitor
implements Http2StreamVisitor {
private final Listener listener;
ListenerWritabilityMonitor(Listener listener) {
this.listener = listener;
}
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
FlowState state = state(stream);
if (isWritable(state) != state.markedWritability()) {
notifyWritabilityChanged(state);
}
return true;
}
@Override
void windowSize(FlowState state, int initialWindowSize) {
super.windowSize(state, initialWindowSize);
try {
checkStateWritability(state);
} catch (Http2Exception e) {
throw new RuntimeException("Caught unexpected exception from window", e);
}
}
@Override
void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
super.incrementWindowSize(state, delta);
checkStateWritability(state);
}
@Override
void initialWindowSize(int newWindowSize) throws Http2Exception {
super.initialWindowSize(newWindowSize);
if (isWritableConnection()) {
// If the write operation does not occur we still need to check all streams because they
// may have transitioned from writable to not writable.
checkAllWritabilityChanged();
}
}
@Override
void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
super.enqueueFrame(state, frame);
checkConnectionThenStreamWritabilityChanged(state);
}
@Override
void stateCancelled(FlowState state) {
try {
checkConnectionThenStreamWritabilityChanged(state);
} catch (Http2Exception e) {
throw new RuntimeException(
"Caught unexpected exception from checkAllWritabilityChanged", e);
}
}
@Override
void channelWritabilityChange() throws Http2Exception {
if (connectionState.markedWritability() != isChannelWritable()) {
checkAllWritabilityChanged();
}
}
private void checkStateWritability(FlowState state) throws Http2Exception {
if (isWritable(state) != state.markedWritability()) {
if (state == connectionState) {
checkAllWritabilityChanged();
} else {
notifyWritabilityChanged(state);
}
}
}
private void notifyWritabilityChanged(FlowState state) {
state.markedWritability(!state.markedWritability());
try {
listener.writabilityChanged(state.stream);
} catch (Throwable cause) {
logger.error("Caught Throwable from listener.writabilityChanged", cause);
}
}
private void checkConnectionThenStreamWritabilityChanged(FlowState state)
throws Http2Exception {
// It is possible that the connection window and/or the individual stream writability could
// change.
if (isWritableConnection() != connectionState.markedWritability()) {
checkAllWritabilityChanged();
} else if (isWritable(state) != state.markedWritability()) {
notifyWritabilityChanged(state);
}
}
private void checkAllWritabilityChanged() throws Http2Exception {
// Make sure we mark that we have notified as a result of this change.
connectionState.markedWritability(isWritableConnection());
connection.forEachActiveStream(this);
}
}
}