core,okhttp: add TransportTracer to okhttpclient (#3809)

This commit is contained in:
zpencer 2017-12-07 17:12:08 -08:00 committed by GitHub
parent 173ca5d332
commit 2b3885bbc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 235 additions and 89 deletions

View File

@ -16,6 +16,7 @@
package io.grpc.internal; package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
@ -86,7 +87,6 @@ public abstract class AbstractClientStream extends AbstractStream
void cancel(Status status); void cancel(Status status);
} }
@Nullable // okhttp does not support transportTracer yet
private final TransportTracer transportTracer; private final TransportTracer transportTracer;
private final Framer framer; private final Framer framer;
private boolean useGet; private boolean useGet;
@ -102,11 +102,11 @@ public abstract class AbstractClientStream extends AbstractStream
protected AbstractClientStream( protected AbstractClientStream(
WritableBufferAllocator bufferAllocator, WritableBufferAllocator bufferAllocator,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
@Nullable TransportTracer transportTracer, TransportTracer transportTracer,
Metadata headers, Metadata headers,
boolean useGet) { boolean useGet) {
Preconditions.checkNotNull(headers, "headers"); checkNotNull(headers, "headers");
this.transportTracer = transportTracer; this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.useGet = useGet; this.useGet = useGet;
if (!useGet) { if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
@ -217,9 +217,9 @@ public abstract class AbstractClientStream extends AbstractStream
protected TransportState( protected TransportState(
int maxMessageSize, int maxMessageSize,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
@Nullable TransportTracer transportTracer) { TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer); super(maxMessageSize, statsTraceCtx, transportTracer);
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
} }
private void setFullStreamDecompression(boolean fullStreamDecompression) { private void setFullStreamDecompression(boolean fullStreamDecompression) {
@ -229,13 +229,13 @@ public abstract class AbstractClientStream extends AbstractStream
private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
Preconditions.checkState(this.listener == null, "Already called start"); Preconditions.checkState(this.listener == null, "Already called start");
this.decompressorRegistry = this.decompressorRegistry =
Preconditions.checkNotNull(decompressorRegistry, "decompressorRegistry"); checkNotNull(decompressorRegistry, "decompressorRegistry");
} }
@VisibleForTesting @VisibleForTesting
public final void setListener(ClientStreamListener listener) { public final void setListener(ClientStreamListener listener) {
Preconditions.checkState(this.listener == null, "Already called setListener"); Preconditions.checkState(this.listener == null, "Already called setListener");
this.listener = Preconditions.checkNotNull(listener, "listener"); this.listener = checkNotNull(listener, "listener");
} }
@Override @Override
@ -308,7 +308,7 @@ public abstract class AbstractClientStream extends AbstractStream
* @param frame the received data frame. Its ownership is transferred to this method. * @param frame the received data frame. Its ownership is transferred to this method.
*/ */
protected void inboundDataReceived(ReadableBuffer frame) { protected void inboundDataReceived(ReadableBuffer frame) {
Preconditions.checkNotNull(frame, "frame"); checkNotNull(frame, "frame");
boolean needToCloseFrame = true; boolean needToCloseFrame = true;
try { try {
if (statusReported) { if (statusReported) {
@ -332,8 +332,8 @@ public abstract class AbstractClientStream extends AbstractStream
* @param status the status extracted from the trailers * @param status the status extracted from the trailers
*/ */
protected void inboundTrailersReceived(Metadata trailers, Status status) { protected void inboundTrailersReceived(Metadata trailers, Status status) {
Preconditions.checkNotNull(status, "status"); checkNotNull(status, "status");
Preconditions.checkNotNull(trailers, "trailers"); checkNotNull(trailers, "trailers");
if (statusReported) { if (statusReported) {
log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}", log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
new Object[]{status, trailers}); new Object[]{status, trailers});
@ -356,8 +356,8 @@ public abstract class AbstractClientStream extends AbstractStream
*/ */
public final void transportReportStatus(final Status status, boolean stopDelivery, public final void transportReportStatus(final Status status, boolean stopDelivery,
final Metadata trailers) { final Metadata trailers) {
Preconditions.checkNotNull(status, "status"); checkNotNull(status, "status");
Preconditions.checkNotNull(trailers, "trailers"); checkNotNull(trailers, "trailers");
// If stopDelivery, we continue in case previous invocation is waiting for stall // If stopDelivery, we continue in case previous invocation is waiting for stall
if (statusReported && !stopDelivery) { if (statusReported && !stopDelivery) {
return; return;
@ -404,8 +404,8 @@ public abstract class AbstractClientStream extends AbstractStream
private byte[] payload; private byte[] payload;
public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) { public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
this.headers = Preconditions.checkNotNull(headers, "headers"); this.headers = checkNotNull(headers, "headers");
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
} }
@Override @Override

View File

@ -24,7 +24,6 @@ import io.grpc.Codec;
import io.grpc.Compressor; import io.grpc.Compressor;
import io.grpc.Decompressor; import io.grpc.Decompressor;
import java.io.InputStream; import java.io.InputStream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
/** /**
@ -108,7 +107,6 @@ public abstract class AbstractStream implements Stream {
private Deframer deframer; private Deframer deframer;
private final Object onReadyLock = new Object(); private final Object onReadyLock = new Object();
private final StatsTraceContext statsTraceCtx; private final StatsTraceContext statsTraceCtx;
@Nullable // okhttp transports don't trace yet
private final TransportTracer transportTracer; private final TransportTracer transportTracer;
/** /**
@ -133,9 +131,9 @@ public abstract class AbstractStream implements Stream {
protected TransportState( protected TransportState(
int maxMessageSize, int maxMessageSize,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
@Nullable TransportTracer transportTracer) { // nullable: okhttp transports don't trace yet TransportTracer transportTracer) {
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = transportTracer; this.transportTracer = checkNotNull(transportTracer, "transportTracer");
deframer = new MessageDeframer( deframer = new MessageDeframer(
this, this,
Codec.Identity.NONE, Codec.Identity.NONE,
@ -234,9 +232,7 @@ public abstract class AbstractStream implements Stream {
allocated = true; allocated = true;
} }
notifyIfReady(); notifyIfReady();
if (transportTracer != null) { transportTracer.reportStreamStarted();
transportTracer.reportStreamStarted();
}
} }
/** /**

View File

@ -88,8 +88,6 @@ public class MessageDeframer implements Closeable, Deframer {
private Listener listener; private Listener listener;
private int maxInboundMessageSize; private int maxInboundMessageSize;
private final StatsTraceContext statsTraceCtx; private final StatsTraceContext statsTraceCtx;
// transportTracer is nullable until it is integrated with client transports
@Nullable
private final TransportTracer transportTracer; private final TransportTracer transportTracer;
private final String debugString; private final String debugString;
private Decompressor decompressor; private Decompressor decompressor;
@ -123,13 +121,13 @@ public class MessageDeframer implements Closeable, Deframer {
Decompressor decompressor, Decompressor decompressor,
int maxMessageSize, int maxMessageSize,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
@Nullable TransportTracer transportTracer, TransportTracer transportTracer,
String debugString) { String debugString) {
this.listener = checkNotNull(listener, "sink"); this.listener = checkNotNull(listener, "sink");
this.decompressor = checkNotNull(decompressor, "decompressor"); this.decompressor = checkNotNull(decompressor, "decompressor");
this.maxInboundMessageSize = maxMessageSize; this.maxInboundMessageSize = maxMessageSize;
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = transportTracer; this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.debugString = debugString; this.debugString = debugString;
} }
@ -395,9 +393,7 @@ public class MessageDeframer implements Closeable, Deframer {
currentMessageSeqNo++; currentMessageSeqNo++;
statsTraceCtx.inboundMessage(currentMessageSeqNo); statsTraceCtx.inboundMessage(currentMessageSeqNo);
if (transportTracer != null) { transportTracer.reportMessageReceived();
transportTracer.reportMessageReceived();
}
// Continue reading the frame body. // Continue reading the frame body.
state = State.BODY; state = State.BODY;
} }

View File

@ -38,6 +38,7 @@ import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ProxyParameters; import io.grpc.internal.ProxyParameters;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.Platform; import io.grpc.okhttp.internal.Platform;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -127,6 +128,13 @@ public class OkHttpChannelBuilder extends
} }
} }
@VisibleForTesting
final OkHttpChannelBuilder setTransportTracerFactory(
TransportTracer.Factory transportTracerFactory) {
this.transportTracerFactory = transportTracerFactory;
return this;
}
/** /**
* Override the default executor necessary for internal transport use. * Override the default executor necessary for internal transport use.
* *
@ -310,7 +318,8 @@ public class OkHttpChannelBuilder extends
boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED; boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
return new OkHttpTransportFactory(transportExecutor, return new OkHttpTransportFactory(transportExecutor,
createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(), createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(),
enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls); enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory);
} }
@Override @Override
@ -376,6 +385,7 @@ public class OkHttpChannelBuilder extends
static final class OkHttpTransportFactory implements ClientTransportFactory { static final class OkHttpTransportFactory implements ClientTransportFactory {
private final Executor executor; private final Executor executor;
private final boolean usingSharedExecutor; private final boolean usingSharedExecutor;
private final TransportTracer.Factory transportTracerFactory;
@Nullable @Nullable
private final SSLSocketFactory socketFactory; private final SSLSocketFactory socketFactory;
@Nullable @Nullable
@ -398,7 +408,8 @@ public class OkHttpChannelBuilder extends
boolean enableKeepAlive, boolean enableKeepAlive,
long keepAliveTimeNanos, long keepAliveTimeNanos,
long keepAliveTimeoutNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls) { boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory) {
this.socketFactory = socketFactory; this.socketFactory = socketFactory;
this.hostnameVerifier = hostnameVerifier; this.hostnameVerifier = hostnameVerifier;
this.connectionSpec = connectionSpec; this.connectionSpec = connectionSpec;
@ -409,6 +420,8 @@ public class OkHttpChannelBuilder extends
this.keepAliveWithoutCalls = keepAliveWithoutCalls; this.keepAliveWithoutCalls = keepAliveWithoutCalls;
usingSharedExecutor = executor == null; usingSharedExecutor = executor == null;
this.transportTracerFactory =
Preconditions.checkNotNull(transportTracerFactory, "transportTracerFactory");
if (usingSharedExecutor) { if (usingSharedExecutor) {
// The executor was unspecified, using the shared executor. // The executor was unspecified, using the shared executor.
this.executor = SharedResourceHolder.get(SHARED_EXECUTOR); this.executor = SharedResourceHolder.get(SHARED_EXECUTOR);
@ -444,7 +457,8 @@ public class OkHttpChannelBuilder extends
proxy == null ? null : proxy.proxyAddress, proxy == null ? null : proxy.proxyAddress,
proxy == null ? null : proxy.username, proxy == null ? null : proxy.username,
proxy == null ? null : proxy.password, proxy == null ? null : proxy.password,
tooManyPingsRunnable); tooManyPingsRunnable,
transportTracerFactory.create());
if (enableKeepAlive) { if (enableKeepAlive) {
transport.enableKeepAlive( transport.enableKeepAlive(
true, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls); true, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls);

View File

@ -27,6 +27,7 @@ import io.grpc.Status;
import io.grpc.internal.AbstractClientStream; import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.Http2ClientStreamTransportState; import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBuffer;
import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header; import io.grpc.okhttp.internal.framed.Header;
@ -69,11 +70,12 @@ class OkHttpClientStream extends AbstractClientStream {
int maxMessageSize, int maxMessageSize,
String authority, String authority,
String userAgent, String userAgent,
StatsTraceContext statsTraceCtx) { StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super( super(
new OkHttpWritableBufferAllocator(), new OkHttpWritableBufferAllocator(),
statsTraceCtx, statsTraceCtx,
/*transportTracer=*/ null, transportTracer,
headers, headers,
method.isSafe()); method.isSafe());
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
@ -152,6 +154,7 @@ class OkHttpClientStream extends AbstractClientStream {
synchronized (state.lock) { synchronized (state.lock) {
state.sendBuffer(buffer, endOfStream, flush); state.sendBuffer(buffer, endOfStream, flush);
getTransportTracer().reportMessageSent(numMessages);
} }
} }
@ -200,7 +203,7 @@ class OkHttpClientStream extends AbstractClientStream {
AsyncFrameWriter frameWriter, AsyncFrameWriter frameWriter,
OutboundFlowController outboundFlow, OutboundFlowController outboundFlow,
OkHttpClientTransport transport) { OkHttpClientTransport transport) {
super(maxMessageSize, statsTraceCtx, /*transportTracer=*/ null); super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer());
this.lock = checkNotNull(lock, "lock"); this.lock = checkNotNull(lock, "lock");
this.frameWriter = frameWriter; this.frameWriter = frameWriter;
this.outboundFlow = outboundFlow; this.outboundFlow = outboundFlow;

View File

@ -46,6 +46,7 @@ import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
import io.grpc.internal.SerializingExecutor; import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.FrameReader; import io.grpc.okhttp.internal.framed.FrameReader;
@ -181,6 +182,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Nullable @Nullable
private final String proxyPassword; private final String proxyPassword;
private final Runnable tooManyPingsRunnable; private final Runnable tooManyPingsRunnable;
@GuardedBy("lock")
private final TransportTracer transportTracer;
// The following fields should only be used for test. // The following fields should only be used for test.
Runnable connectingCallback; Runnable connectingCallback;
@ -190,7 +193,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
Executor executor, @Nullable SSLSocketFactory sslSocketFactory, Executor executor, @Nullable SSLSocketFactory sslSocketFactory,
@Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec,
int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername, int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername,
@Nullable String proxyPassword, Runnable tooManyPingsRunnable) { @Nullable String proxyPassword, Runnable tooManyPingsRunnable,
TransportTracer transportTracer) {
this.address = Preconditions.checkNotNull(address, "address"); this.address = Preconditions.checkNotNull(address, "address");
this.defaultAuthority = authority; this.defaultAuthority = authority;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
@ -209,6 +213,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
this.proxyPassword = proxyPassword; this.proxyPassword = proxyPassword;
this.tooManyPingsRunnable = this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportTracer = Preconditions.checkNotNull(transportTracer);
initTransportTracer();
} }
/** /**
@ -226,7 +232,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Nullable Runnable connectingCallback, @Nullable Runnable connectingCallback,
SettableFuture<Void> connectedFuture, SettableFuture<Void> connectedFuture,
int maxMessageSize, int maxMessageSize,
Runnable tooManyPingsRunnable) { Runnable tooManyPingsRunnable,
TransportTracer transportTracer) {
address = null; address = null;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
defaultAuthority = "notarealauthority:80"; defaultAuthority = "notarealauthority:80";
@ -246,6 +253,23 @@ class OkHttpClientTransport implements ConnectionClientTransport {
this.proxyPassword = null; this.proxyPassword = null;
this.tooManyPingsRunnable = this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
initTransportTracer();
}
private void initTransportTracer() {
synchronized (lock) { // to make @GuardedBy linter happy
transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
@Override
public TransportTracer.FlowControlWindows read() {
synchronized (lock) {
long local = -1; // okhttp does not track the local window size
long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
return new TransportTracer.FlowControlWindows(local, remote);
}
}
});
}
} }
/** /**
@ -286,6 +310,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
stopwatch.start(); stopwatch.start();
p = ping = new Http2Ping(data, stopwatch); p = ping = new Http2Ping(data, stopwatch);
writePing = true; writePing = true;
transportTracer.reportKeepAliveSent();
} }
} }
if (writePing) { if (writePing) {
@ -302,8 +327,18 @@ class OkHttpClientTransport implements ConnectionClientTransport {
Preconditions.checkNotNull(method, "method"); Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers"); Preconditions.checkNotNull(headers, "headers");
StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
return new OkHttpClientStream(method, headers, frameWriter, OkHttpClientTransport.this, return new OkHttpClientStream(
outboundFlow, lock, maxMessageSize, defaultAuthority, userAgent, statsTraceCtx); method,
headers,
frameWriter,
OkHttpClientTransport.this,
outboundFlow,
lock,
maxMessageSize,
defaultAuthority,
userAgent,
statsTraceCtx,
transportTracer);
} }
@GuardedBy("lock") @GuardedBy("lock")
@ -858,9 +893,11 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Override @Override
public Future<InternalTransportStats> getTransportStats() { public Future<InternalTransportStats> getTransportStats() {
SettableFuture<InternalTransportStats> ret = SettableFuture.create(); synchronized (lock) {
ret.set(null); SettableFuture<InternalTransportStats> ret = SettableFuture.create();
return ret; ret.set(transportTracer.getStats());
return ret;
}
} }
/** /**

View File

@ -23,7 +23,9 @@ import java.util.List;
public final class AccessProtectedHack { public final class AccessProtectedHack {
public static InternalServer serverBuilderBuildTransportServer( public static InternalServer serverBuilderBuildTransportServer(
AbstractServerImplBuilder<?> builder, AbstractServerImplBuilder<?> builder,
List<ServerStreamTracer.Factory> streamTracerFactories) { List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory) {
builder.transportTracerFactory = transportTracerFactory;
return builder.buildTransportServer(streamTracerFactories); return builder.buildTransportServer(streamTracerFactories);
} }

View File

@ -33,6 +33,7 @@ import io.grpc.Status;
import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header; import io.grpc.okhttp.internal.framed.Header;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -62,6 +63,7 @@ public class OkHttpClientStreamTest {
@Captor private ArgumentCaptor<List<Header>> headersCaptor; @Captor private ArgumentCaptor<List<Header>> headersCaptor;
private final Object lock = new Object(); private final Object lock = new Object();
private final TransportTracer transportTracer = new TransportTracer();
private MethodDescriptor<?, ?> methodDescriptor; private MethodDescriptor<?, ?> methodDescriptor;
private OkHttpClientStream stream; private OkHttpClientStream stream;
@ -76,8 +78,18 @@ public class OkHttpClientStreamTest {
.setResponseMarshaller(marshaller) .setResponseMarshaller(marshaller)
.build(); .build();
stream = new OkHttpClientStream(methodDescriptor, new Metadata(), frameWriter, transport, stream = new OkHttpClientStream(
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "userAgent", StatsTraceContext.NOOP); methodDescriptor,
new Metadata(),
frameWriter,
transport,
flowController,
lock,
MAX_MESSAGE_SIZE,
"localhost",
"userAgent",
StatsTraceContext.NOOP,
transportTracer);
} }
@Test @Test
@ -134,7 +146,7 @@ public class OkHttpClientStreamTest {
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP); StatsTraceContext.NOOP, transportTracer);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.transportState().start(3); stream.transportState().start(3);
@ -149,7 +161,7 @@ public class OkHttpClientStreamTest {
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP); StatsTraceContext.NOOP, transportTracer);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.transportState().start(3); stream.transportState().start(3);
@ -177,7 +189,7 @@ public class OkHttpClientStreamTest {
.build(); .build();
stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport, stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP); StatsTraceContext.NOOP, transportTracer);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
// GET streams send headers after halfClose is called. // GET streams send headers after halfClose is called.

View File

@ -54,6 +54,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.InternalStatus; import io.grpc.InternalStatus;
import io.grpc.InternalTransportStats;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
@ -65,6 +66,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler; import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.ErrorCode;
@ -92,6 +94,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSocketFactory;
import okio.Buffer; import okio.Buffer;
import okio.ByteString; import okio.ByteString;
import org.junit.After; import org.junit.After;
@ -127,9 +131,16 @@ public class OkHttpClientTransportTest {
@Mock @Mock
private ManagedClientTransport.Listener transportListener; private ManagedClientTransport.Listener transportListener;
private final SSLSocketFactory sslSocketFactory = null;
private final HostnameVerifier hostnameVerifier = null;
private final InetSocketAddress proxyAddr = null;
private final String proxyUser = null;
private final String proxyPassword = null;
private final TransportTracer transportTracer = new TransportTracer();
private OkHttpClientTransport clientTransport; private OkHttpClientTransport clientTransport;
private MockFrameReader frameReader; private MockFrameReader frameReader;
private ExecutorService executor; private ExecutorService executor = Executors.newCachedThreadPool();
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
private SettableFuture<Void> connectedFuture; private SettableFuture<Void> connectedFuture;
private DelayConnectedCallback delayConnectedCallback; private DelayConnectedCallback delayConnectedCallback;
@ -143,7 +154,6 @@ public class OkHttpClientTransportTest {
@Before @Before
public void setUp() { public void setUp() {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
executor = Executors.newCachedThreadPool();
when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE); when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE);
frameReader = new MockFrameReader(); frameReader = new MockFrameReader();
} }
@ -192,7 +202,8 @@ public class OkHttpClientTransportTest {
connectingCallback, connectingCallback,
connectedFuture, connectedFuture,
maxMessageSize, maxMessageSize,
tooManyPingsRunnable); tooManyPingsRunnable,
new TransportTracer());
clientTransport.start(transportListener); clientTransport.start(transportListener);
if (waitingForConnected) { if (waitingForConnected) {
connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS); connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
@ -203,10 +214,19 @@ public class OkHttpClientTransportTest {
public void testToString() throws Exception { public void testToString() throws Exception {
InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
clientTransport = new OkHttpClientTransport( clientTransport = new OkHttpClientTransport(
address, "hostname", null /* agent */, executor, /* sslSocketFactory */ null, address,
/* hostnameVerifier */null, "hostname",
Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC), DEFAULT_MAX_MESSAGE_SIZE, /*agent=*/ null,
null, null, null, tooManyPingsRunnable); executor,
sslSocketFactory,
hostnameVerifier,
Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC),
DEFAULT_MAX_MESSAGE_SIZE,
proxyAddr,
proxyUser,
proxyPassword,
tooManyPingsRunnable,
transportTracer);
String s = clientTransport.toString(); String s = clientTransport.toString();
assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport")); assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport"));
assertTrue("Unexpected: " + s, s.contains(address.toString())); assertTrue("Unexpected: " + s, s.contains(address.toString()));
@ -524,6 +544,30 @@ public class OkHttpClientTransportTest {
shutdownAndVerify(); shutdownAndVerify();
} }
@Test
public void transportTracer_windowSizeDefault() throws Exception {
initTransport();
InternalTransportStats stats = clientTransport.getTransportStats().get();
assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
// okhttp does not track local window sizes
assertEquals(-1, stats.localFlowControlWindow);
}
@Test
public void transportTracer_windowSize_remote() throws Exception {
initTransport();
InternalTransportStats before = clientTransport.getTransportStats().get();
assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
// okhttp does not track local window sizes
assertEquals(-1, before.localFlowControlWindow);
frameHandler().windowUpdate(0, 1000);
InternalTransportStats after = clientTransport.getTransportStats().get();
assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow);
// okhttp does not track local window sizes
assertEquals(-1, after.localFlowControlWindow);
}
@Test @Test
public void windowUpdate() throws Exception { public void windowUpdate() throws Exception {
initTransport(); initTransport();
@ -1234,9 +1278,11 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
PingCallbackImpl callback1 = new PingCallbackImpl(); PingCallbackImpl callback1 = new PingCallbackImpl();
clientTransport.ping(callback1, MoreExecutors.directExecutor()); clientTransport.ping(callback1, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
// add'l ping will be added as listener to outstanding operation // add'l ping will be added as listener to outstanding operation
PingCallbackImpl callback2 = new PingCallbackImpl(); PingCallbackImpl callback2 = new PingCallbackImpl();
clientTransport.ping(callback2, MoreExecutors.directExecutor()); clientTransport.ping(callback2, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class); ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class);
ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class); ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class);
@ -1269,6 +1315,7 @@ public class OkHttpClientTransportTest {
// now that previous ping is done, next request returns a different future // now that previous ping is done, next request returns a different future
callback1 = new PingCallbackImpl(); callback1 = new PingCallbackImpl();
clientTransport.ping(callback1, MoreExecutors.directExecutor()); clientTransport.ping(callback1, MoreExecutors.directExecutor());
assertEquals(2, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(0, callback1.invocationCount); assertEquals(0, callback1.invocationCount);
shutdownAndVerify(); shutdownAndVerify();
} }
@ -1278,6 +1325,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
PingCallbackImpl callback = new PingCallbackImpl(); PingCallbackImpl callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor()); clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(0, callback.invocationCount); assertEquals(0, callback.invocationCount);
clientTransport.shutdown(SHUTDOWN_REASON); clientTransport.shutdown(SHUTDOWN_REASON);
@ -1289,6 +1337,7 @@ public class OkHttpClientTransportTest {
// now that handler is in terminal state, all future pings fail immediately // now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl(); callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor()); clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, callback.invocationCount); assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException); assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
@ -1300,6 +1349,7 @@ public class OkHttpClientTransportTest {
initTransport(); initTransport();
PingCallbackImpl callback = new PingCallbackImpl(); PingCallbackImpl callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor()); clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(0, callback.invocationCount); assertEquals(0, callback.invocationCount);
clientTransport.onException(new IOException()); clientTransport.onException(new IOException());
@ -1312,6 +1362,7 @@ public class OkHttpClientTransportTest {
// now that handler is in terminal state, all future pings fail immediately // now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl(); callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor()); clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, callback.invocationCount); assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException); assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE, assertEquals(Status.Code.UNAVAILABLE,
@ -1392,14 +1443,15 @@ public class OkHttpClientTransportTest {
"invalid_authority", "invalid_authority",
"userAgent", "userAgent",
executor, executor,
null, sslSocketFactory,
null, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
null, proxyAddr,
null, proxyUser,
null, proxyPassword,
tooManyPingsRunnable); tooManyPingsRunnable,
transportTracer);
String host = clientTransport.getOverridenHost(); String host = clientTransport.getOverridenHost();
int port = clientTransport.getOverridenPort(); int port = clientTransport.getOverridenPort();
@ -1415,14 +1467,15 @@ public class OkHttpClientTransportTest {
"authority", "authority",
"userAgent", "userAgent",
executor, executor,
null, sslSocketFactory,
null, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
null, proxyAddr,
null, proxyUser,
null, proxyPassword,
tooManyPingsRunnable); tooManyPingsRunnable,
new TransportTracer());
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener); clientTransport.start(listener);
@ -1446,14 +1499,15 @@ public class OkHttpClientTransportTest {
"authority", "authority",
"userAgent", "userAgent",
executor, executor,
null, sslSocketFactory,
null, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(), (InetSocketAddress) serverSocket.getLocalSocketAddress(),
null, proxyUser,
null, proxyPassword,
tooManyPingsRunnable); tooManyPingsRunnable,
transportTracer);
clientTransport.start(transportListener); clientTransport.start(transportListener);
Socket sock = serverSocket.accept(); Socket sock = serverSocket.accept();
@ -1496,14 +1550,15 @@ public class OkHttpClientTransportTest {
"authority", "authority",
"userAgent", "userAgent",
executor, executor,
null, sslSocketFactory,
null, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(), (InetSocketAddress) serverSocket.getLocalSocketAddress(),
null, proxyUser,
null, proxyPassword,
tooManyPingsRunnable); tooManyPingsRunnable,
transportTracer);
clientTransport.start(transportListener); clientTransport.start(transportListener);
Socket sock = serverSocket.accept(); Socket sock = serverSocket.accept();
@ -1545,14 +1600,15 @@ public class OkHttpClientTransportTest {
"authority", "authority",
"userAgent", "userAgent",
executor, executor,
null, sslSocketFactory,
null, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(), (InetSocketAddress) serverSocket.getLocalSocketAddress(),
null, proxyUser,
null, proxyPassword,
tooManyPingsRunnable); tooManyPingsRunnable,
transportTracer);
clientTransport.start(transportListener); clientTransport.start(transportListener);
Socket sock = serverSocket.accept(); Socket sock = serverSocket.accept();
@ -1576,14 +1632,15 @@ public class OkHttpClientTransportTest {
"authority", "authority",
"userAgent", "userAgent",
executor, executor,
null, sslSocketFactory,
null, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
InetSocketAddress.createUnresolved("unresolvedproxy", 80), InetSocketAddress.createUnresolved("unresolvedproxy", 80),
null, proxyUser,
null, proxyPassword,
tooManyPingsRunnable); tooManyPingsRunnable,
transportTracer);
clientTransport.start(transportListener); clientTransport.start(transportListener);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);

View File

@ -19,12 +19,15 @@ package io.grpc.okhttp;
import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer;
import io.grpc.internal.AccessProtectedHack; import io.grpc.internal.AccessProtectedHack;
import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.InternalServer; import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.AbstractTransportTest; import io.grpc.internal.testing.AbstractTransportTest;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After; import org.junit.After;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -34,10 +37,19 @@ import org.junit.runners.JUnit4;
/** Unit tests for OkHttp transport. */ /** Unit tests for OkHttp transport. */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class OkHttpTransportTest extends AbstractTransportTest { public class OkHttpTransportTest extends AbstractTransportTest {
private final FakeClock fakeClock = new FakeClock();
private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
new TransportTracer.TimeProvider() {
@Override
public long currentTimeMillis() {
return fakeClock.currentTimeMillis();
}
});
private ClientTransportFactory clientFactory = OkHttpChannelBuilder private ClientTransportFactory clientFactory = OkHttpChannelBuilder
// Although specified here, address is ignored because we never call build. // Although specified here, address is ignored because we never call build.
.forAddress("localhost", 0) .forAddress("localhost", 0)
.negotiationType(NegotiationType.PLAINTEXT) .negotiationType(NegotiationType.PLAINTEXT)
.setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportFactory(); .buildTransportFactory();
@After @After
@ -51,7 +63,8 @@ public class OkHttpTransportTest extends AbstractTransportTest {
NettyServerBuilder NettyServerBuilder
.forPort(0) .forPort(0)
.flowControlWindow(65 * 1024), .flowControlWindow(65 * 1024),
streamTracerFactories); streamTracerFactories,
fakeClockTransportTracer);
} }
@Override @Override
@ -62,7 +75,8 @@ public class OkHttpTransportTest extends AbstractTransportTest {
NettyServerBuilder NettyServerBuilder
.forPort(port) .forPort(port)
.flowControlWindow(65 * 1024), .flowControlWindow(65 * 1024),
streamTracerFactories); streamTracerFactories,
fakeClockTransportTracer);
} }
@Override @Override
@ -80,9 +94,24 @@ public class OkHttpTransportTest extends AbstractTransportTest {
null /* proxy */); null /* proxy */);
} }
@Override
protected void advanceClock(long offset, TimeUnit unit) {
fakeClock.forwardNanos(unit.toNanos(offset));
}
@Override
protected long currentTimeMillis() {
return fakeClock.currentTimeMillis();
}
// TODO(ejona): Flaky/Broken // TODO(ejona): Flaky/Broken
@Test @Test
@Ignore @Ignore
@Override @Override
public void flowControlPushBack() {} public void flowControlPushBack() {}
@Override
protected boolean haveTransportTracer() {
return true;
}
} }