netty,okhttp: handle ENHANCE_YOUR_CALM and too_many_pings

Any ENHANCE_YOUR_CALM is good reason to log, but too_many_pings also
triggers increase of the keepalive time for later connections.
This commit is contained in:
Eric Anderson 2017-04-17 14:25:16 -07:00 committed by GitHub
parent 6618f9739e
commit 393ebf7cdd
10 changed files with 391 additions and 32 deletions

View File

@ -0,0 +1,89 @@
/*
* Copyright 2017, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
/**
* A {@code long} atomically updated due to errors caused by the value being too small.
*/
@ThreadSafe
public final class AtomicBackoff {
private static final Logger log = Logger.getLogger(AtomicBackoff.class.getName());
private final String name;
private final AtomicLong value = new AtomicLong();
/** Construct an atomic with initial value {@code value}. {@code name} is used for logging. */
public AtomicBackoff(String name, long value) {
Preconditions.checkArgument(value > 0, "value must be positive");
this.name = name;
this.value.set(value);
}
/** Returns the current state. The state instance's value does not change of time. */
public State getState() {
return new State(value.get());
}
@ThreadSafe
public final class State {
private final long savedValue;
private State(long value) {
this.savedValue = value;
}
public long get() {
return savedValue;
}
/**
* Causes future invocations of {@link AtomicBackoff#getState} to have a value at least double
* this state's value. Subsequent calls to this method will not increase the value further.
*/
public void backoff() {
// Use max to handle overflow
long newValue = Math.max(savedValue * 2, savedValue);
boolean swapped = value.compareAndSet(savedValue, newValue);
// Even if swapped is false, the current value should be at least as large as newValue
assert value.get() >= newValue;
if (swapped) {
log.log(Level.WARNING, "Increased {0} to {1}", new Object[] {name, newValue});
}
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2017, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link AtomicBackoff}. */
@RunWith(JUnit4.class)
public class AtomicBackoffTest {
@Test(expected = IllegalArgumentException.class)
public void mustBePositive() {
new AtomicBackoff("test", 0);
}
@Test
public void backoff_doesNotChangeStateGet() {
AtomicBackoff backoff = new AtomicBackoff("test", 8);
AtomicBackoff.State state = backoff.getState();
assertEquals(8, state.get());
state.backoff();
assertEquals(8, state.get());
}
@Test
public void backoff_doubles() {
AtomicBackoff backoff = new AtomicBackoff("test", 3);
backoff.getState().backoff();
assertEquals(6, backoff.getState().get());
backoff.getState().backoff();
assertEquals(12, backoff.getState().get());
backoff.getState().backoff();
assertEquals(24, backoff.getState().get());
backoff = new AtomicBackoff("test", 13);
backoff.getState().backoff();
assertEquals(26, backoff.getState().get());
}
@Test
public void backoff_oncePerState() {
AtomicBackoff backoff = new AtomicBackoff("test", 8);
AtomicBackoff.State state = backoff.getState();
state.backoff();
assertEquals(8, state.get());
assertEquals(16, backoff.getState().get());
state.backoff(); // Nothing happens the second time
assertEquals(8, state.get());
assertEquals(16, backoff.getState().get());
}
@Test
public void backoff_twiceForEquivalentState_noChange() {
AtomicBackoff backoff = new AtomicBackoff("test", 8);
AtomicBackoff.State state1 = backoff.getState();
AtomicBackoff.State state2 = backoff.getState();
state1.backoff();
state2.backoff();
assertEquals(16, backoff.getState().get());
}
@Test
public void backoff_delayed() {
AtomicBackoff backoff = new AtomicBackoff("test", 8);
AtomicBackoff.State state = backoff.getState();
backoff.getState().backoff();
backoff.getState().backoff();
assertEquals(32, backoff.getState().get());
// Shouldn't decrease value
state.backoff();
assertEquals(32, backoff.getState().get());
}
@Test
public void largeLong() {
// Don't use MAX_VALUE because it is easy to check for explicitly
long tooLarge = Long.MAX_VALUE - 100;
AtomicBackoff backoff = new AtomicBackoff("test", tooLarge);
backoff.getState().backoff();
// It would also be okay if it became MAX_VALUE
assertEquals(tooLarge, backoff.getState().get());
}
}

View File

@ -45,6 +45,7 @@ import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AtomicBackoff;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil;
@ -451,7 +452,7 @@ public final class NettyChannelBuilder
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
private final long keepAliveTimeNanos;
private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
@ -481,7 +482,7 @@ public final class NettyChannelBuilder
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeNanos = keepAliveTimeNanos;
this.keepAliveTimeNanos = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos);
this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
usingSharedGroup = group == null;
@ -501,11 +502,19 @@ public final class NettyChannelBuilder
TransportCreationParamsFilter dparams =
transportCreationParamsFilterFactory.create(serverAddress, authority, userAgent);
final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
Runnable tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
keepAliveTimeNanosState.backoff();
}
};
NettyClientTransport transport = new NettyClientTransport(
dparams.getTargetServerAddress(), channelType, channelOptions, group,
dparams.getProtocolNegotiator(), flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent());
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
tooManyPingsRunnable);
return transport;
}

View File

@ -114,7 +114,7 @@ class NettyClientHandler extends AbstractNettyHandler {
static NettyClientHandler newHandler(ClientTransportLifecycleManager lifecycleManager,
@Nullable KeepAliveManager keepAliveManager, int flowControlWindow, int maxHeaderListSize,
Ticker ticker) {
Ticker ticker, Runnable tooManyPingsRunnable) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@ -122,20 +122,21 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2Connection connection = new DefaultHttp2Connection(false);
return newHandler(connection, frameReader, frameWriter, lifecycleManager, keepAliveManager,
flowControlWindow, maxHeaderListSize, ticker);
flowControlWindow, maxHeaderListSize, ticker, tooManyPingsRunnable);
}
@VisibleForTesting
static NettyClientHandler newHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager, int flowControlWindow, int maxHeaderListSize,
Ticker ticker) {
Ticker ticker, Runnable tooManyPingsRunnable) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Preconditions.checkNotNull(ticker, "ticker");
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
@ -159,12 +160,12 @@ class NettyClientHandler extends AbstractNettyHandler {
settings.maxHeaderListSize(maxHeaderListSize);
return new NettyClientHandler(decoder, encoder, settings, lifecycleManager, keepAliveManager,
ticker);
ticker, tooManyPingsRunnable);
}
private NettyClientHandler(Http2ConnectionDecoder decoder, StreamBufferingEncoder encoder,
Http2Settings settings, ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager, Ticker ticker) {
KeepAliveManager keepAliveManager, Ticker ticker, final Runnable tooManyPingsRunnable) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
@ -179,7 +180,16 @@ class NettyClientHandler extends AbstractNettyHandler {
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
goingAway(statusFromGoAway(errorCode, ByteBufUtil.getBytes(debugData)));
byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
goingAway(statusFromGoAway(errorCode, debugDataBytes));
if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
String data = new String(debugDataBytes, UTF_8);
logger.log(
Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {1}", data);
if ("too_many_pings".equals(data)) {
tooManyPingsRunnable.run();
}
}
}
@Override

View File

@ -87,6 +87,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final long keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
private final Runnable tooManyPingsRunnable;
private ProtocolNegotiator.Handler negotiationHandler;
private NettyClientHandler handler;
@ -103,7 +104,8 @@ class NettyClientTransport implements ConnectionClientTransport {
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent) {
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
@ -117,6 +119,8 @@ class NettyClientTransport implements ConnectionClientTransport {
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
this.authority = new AsciiString(authority);
this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
}
@Override
@ -184,7 +188,7 @@ class NettyClientTransport implements ConnectionClientTransport {
}
handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
maxHeaderListSize, Ticker.systemTicker());
maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
HandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);

View File

@ -89,6 +89,7 @@ import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
@ -116,6 +117,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
private List<String> setKeepaliveManagerFor = ImmutableList.of("cancelShouldSucceed",
"sendFrameShouldSucceed", "channelShutdownShouldCancelBufferedStreams",
"createIncrementsIdsForActualAndBufferdStreams", "dataPingAckIsRecognized");
private Runnable tooManyPingsRunnable = new Runnable() {
@Override public void run() {}
};
@Rule
public TestName testNameRule = new TestName();
@ -348,6 +352,48 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
status.getDescription());
}
// This test is not as useful as it looks, because the HTTP/2 Netty code catches and doesn't
// propagate exceptions during the onGoAwayReceived callback.
@Test
public void receivedGoAway_notUtf8() throws Exception {
// 0xFF is never permitted in UTF-8. 0xF0 should have 3 continuations following, and 0x0a isn't
// a continuation.
channelRead(goAwayFrame(0, 11 /* ENHANCE_YOUR_CALM */,
Unpooled.copiedBuffer(new byte[] {(byte) 0xFF, (byte) 0xF0, (byte) 0x0a})));
}
@Test
public void receivedGoAway_enhanceYourCalmWithoutTooManyPings() throws Exception {
final AtomicBoolean b = new AtomicBoolean();
tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
b.set(true);
}
};
setUp();
channelRead(goAwayFrame(0, 11 /* ENHANCE_YOUR_CALM */,
Unpooled.copiedBuffer("not_many_pings", UTF_8)));
assertFalse(b.get());
}
@Test
public void receivedGoAway_enhanceYourCalmWithTooManyPings() throws Exception {
final AtomicBoolean b = new AtomicBoolean();
tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
b.set(true);
}
};
setUp();
channelRead(goAwayFrame(0, 11 /* ENHANCE_YOUR_CALM */,
Unpooled.copiedBuffer("too_many_pings", UTF_8)));
assertTrue(b.get());
}
@Test
public void cancelStreamShouldCreateAndThenFailBufferedStream() throws Exception {
receiveMaxConcurrentStreams(0);
@ -575,7 +621,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
};
return NettyClientHandler.newHandler(connection, frameReader(), frameWriter(),
lifecycleManager, mockKeepAliveManager, flowControlWindow, maxHeaderListSize, ticker);
lifecycleManager, mockKeepAliveManager, flowControlWindow, maxHeaderListSize, ticker,
tooManyPingsRunnable);
}
@Override

View File

@ -113,6 +113,10 @@ public class NettyClientTransportTest {
private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final EchoServerListener serverListener = new EchoServerListener();
private Runnable tooManyPingsRunnable = new Runnable() {
// Throwing is useless in this method, because Netty doesn't propagate the exception
@Override public void run() {}
};
private InetSocketAddress address;
private String authority;
@ -173,7 +177,8 @@ public class NettyClientTransportTest {
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */);
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
tooManyPingsRunnable);
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
@ -300,7 +305,7 @@ public class NettyClientTransportTest {
address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null);
null, tooManyPingsRunnable);
transports.add(transport);
// Should not throw
@ -466,7 +471,7 @@ public class NettyClientTransportTest {
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, 1L, false, authority,
userAgent);
userAgent, tooManyPingsRunnable);
transports.add(transport);
return transport;
}

View File

@ -45,6 +45,7 @@ import io.grpc.ExperimentalApi;
import io.grpc.Internal;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AtomicBackoff;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil;
@ -360,7 +361,7 @@ public class OkHttpChannelBuilder extends
private final ConnectionSpec connectionSpec;
private final int maxMessageSize;
private final boolean enableKeepAlive;
private final long keepAliveTimeNanos;
private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
private boolean closed;
@ -377,7 +378,7 @@ public class OkHttpChannelBuilder extends
this.connectionSpec = connectionSpec;
this.maxMessageSize = maxMessageSize;
this.enableKeepAlive = enableKeepAlive;
this.keepAliveTimeNanos = keepAliveTimeNanos;
this.keepAliveTimeNanos = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos);
this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
@ -406,13 +407,20 @@ public class OkHttpChannelBuilder extends
}
proxyAddress = new InetSocketAddress(parts[0], port);
}
final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
Runnable tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
keepAliveTimeNanosState.backoff();
}
};
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
OkHttpClientTransport transport = new OkHttpClientTransport(inetSocketAddr, authority,
userAgent, executor, socketFactory, Utils.convertSpec(connectionSpec), maxMessageSize,
proxyAddress, null, null);
proxyAddress, null, null, tooManyPingsRunnable);
if (enableKeepAlive) {
transport.enableKeepAlive(
true, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);
true, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls);
}
return transport;
}

View File

@ -190,6 +190,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
private final String proxyUsername;
@Nullable
private final String proxyPassword;
private final Runnable tooManyPingsRunnable;
// The following fields should only be used for test.
Runnable connectingCallback;
@ -198,7 +199,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent,
Executor executor, @Nullable SSLSocketFactory sslSocketFactory, ConnectionSpec connectionSpec,
int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername,
@Nullable String proxyPassword) {
@Nullable String proxyPassword, Runnable tooManyPingsRunnable) {
this.address = Preconditions.checkNotNull(address, "address");
this.defaultAuthority = authority;
this.maxMessageSize = maxMessageSize;
@ -214,6 +215,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
this.proxyAddress = proxyAddress;
this.proxyUsername = proxyUsername;
this.proxyPassword = proxyPassword;
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
}
/**
@ -223,7 +226,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
OkHttpClientTransport(String userAgent, Executor executor, FrameReader frameReader,
FrameWriter testFrameWriter, int nextStreamId, Socket socket, Ticker ticker,
@Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture,
int maxMessageSize) {
int maxMessageSize, Runnable tooManyPingsRunnable) {
address = null;
this.maxMessageSize = maxMessageSize;
defaultAuthority = "notarealauthority:80";
@ -241,6 +244,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
this.proxyAddress = null;
this.proxyUsername = null;
this.proxyPassword = null;
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
}
/**
@ -1014,9 +1019,17 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Override
public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
String data = debugData.utf8();
log.log(Level.WARNING, String.format(
"%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
if ("too_many_pings".equals(data)) {
tooManyPingsRunnable.run();
}
}
Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
.augmentDescription("Received Goaway");
if (debugData != null && debugData.size() > 0) {
if (debugData.size() > 0) {
// If a debug message was provided, use it.
status = status.augmentDescription(debugData.utf8());
}

View File

@ -99,8 +99,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import okio.Buffer;
import okio.ByteString;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -140,6 +142,11 @@ public class OkHttpClientTransportTest {
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
private SettableFuture<Void> connectedFuture;
private DelayConnectedCallback delayConnectedCallback;
private Runnable tooManyPingsRunnable = new Runnable() {
@Override public void run() {
throw new AssertionError();
}
};
/** Set up for test. */
@Before
@ -179,7 +186,7 @@ public class OkHttpClientTransportTest {
};
clientTransport = new OkHttpClientTransport(userAgent, executor, frameReader,
frameWriter, startId, new MockSocket(frameReader), ticker, connectingCallback,
connectedFuture, maxMessageSize);
connectedFuture, maxMessageSize, tooManyPingsRunnable);
clientTransport.start(transportListener);
if (waitingForConnected) {
connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
@ -192,7 +199,7 @@ public class OkHttpClientTransportTest {
clientTransport = new OkHttpClientTransport(
address, "hostname", null /* agent */, executor, null,
Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC), DEFAULT_MAX_MESSAGE_SIZE,
null, null, null);
null, null, null, tooManyPingsRunnable);
String s = clientTransport.toString();
assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport"));
assertTrue("Unexpected: " + s, s.contains(address.toString()));
@ -715,7 +722,7 @@ public class OkHttpClientTransportTest {
assertEquals(2, activeStreamCount());
// Receive goAway, max good id is 3.
frameHandler().goAway(3, ErrorCode.CANCEL, null);
frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY);
// Transport should be in STOPPING state.
verify(transportListener).transportShutdown(isA(Status.class));
@ -864,7 +871,7 @@ public class OkHttpClientTransportTest {
assertEquals(1, activeStreamCount());
// Receives GO_AWAY.
frameHandler().goAway(99, ErrorCode.CANCEL, null);
frameHandler().goAway(99, ErrorCode.CANCEL, ByteString.EMPTY);
listener2.waitUntilStreamClosed();
assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
@ -1339,7 +1346,8 @@ public class OkHttpClientTransportTest {
DEFAULT_MAX_MESSAGE_SIZE,
null,
null,
null);
null,
tooManyPingsRunnable);
String host = clientTransport.getOverridenHost();
int port = clientTransport.getOverridenPort();
@ -1360,7 +1368,8 @@ public class OkHttpClientTransportTest {
DEFAULT_MAX_MESSAGE_SIZE,
null,
null,
null);
null,
tooManyPingsRunnable);
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener);
@ -1389,7 +1398,8 @@ public class OkHttpClientTransportTest {
DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(),
null,
null);
null,
tooManyPingsRunnable);
clientTransport.start(transportListener);
Socket sock = serverSocket.accept();
@ -1437,7 +1447,8 @@ public class OkHttpClientTransportTest {
DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(),
null,
null);
null,
tooManyPingsRunnable);
clientTransport.start(transportListener);
Socket sock = serverSocket.accept();
@ -1484,7 +1495,8 @@ public class OkHttpClientTransportTest {
DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(),
null,
null);
null,
tooManyPingsRunnable);
clientTransport.start(transportListener);
Socket sock = serverSocket.accept();
@ -1501,6 +1513,49 @@ public class OkHttpClientTransportTest {
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}
@Test
public void goAway_notUtf8() throws Exception {
initTransport();
// 0xFF is never permitted in UTF-8. 0xF0 should have 3 continuations following, and 0x0a isn't
// a continuation.
frameHandler().goAway(
0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.of((byte) 0xFF, (byte) 0xF0, (byte) 0x0a));
shutdownAndVerify();
}
@Test
public void goAway_notTooManyPings() throws Exception {
final AtomicBoolean run = new AtomicBoolean();
tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
run.set(true);
}
};
initTransport();
frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("not_many_pings"));
assertFalse(run.get());
shutdownAndVerify();
}
@Test
public void goAway_tooManyPings() throws Exception {
final AtomicBoolean run = new AtomicBoolean();
tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
run.set(true);
}
};
initTransport();
frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("too_many_pings"));
assertTrue(run.get());
shutdownAndVerify();
}
private int activeStreamCount() {
return clientTransport.getActiveStreams().length;
}