diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 555d63468e..8b2bcf199b 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -16,6 +16,7 @@ package io.grpc.internal; +import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; @@ -86,7 +87,6 @@ public abstract class AbstractClientStream extends AbstractStream void cancel(Status status); } - @Nullable // okhttp does not support transportTracer yet private final TransportTracer transportTracer; private final Framer framer; private boolean useGet; @@ -102,11 +102,11 @@ public abstract class AbstractClientStream extends AbstractStream protected AbstractClientStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx, - @Nullable TransportTracer transportTracer, + TransportTracer transportTracer, Metadata headers, boolean useGet) { - Preconditions.checkNotNull(headers, "headers"); - this.transportTracer = transportTracer; + checkNotNull(headers, "headers"); + this.transportTracer = checkNotNull(transportTracer, "transportTracer"); this.useGet = useGet; if (!useGet) { framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); @@ -217,9 +217,9 @@ public abstract class AbstractClientStream extends AbstractStream protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, - @Nullable TransportTracer transportTracer) { + TransportTracer transportTracer) { super(maxMessageSize, statsTraceCtx, transportTracer); - this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); + this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); } private void setFullStreamDecompression(boolean fullStreamDecompression) { @@ -229,13 +229,13 @@ public abstract class AbstractClientStream extends AbstractStream private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { Preconditions.checkState(this.listener == null, "Already called start"); this.decompressorRegistry = - Preconditions.checkNotNull(decompressorRegistry, "decompressorRegistry"); + checkNotNull(decompressorRegistry, "decompressorRegistry"); } @VisibleForTesting public final void setListener(ClientStreamListener listener) { Preconditions.checkState(this.listener == null, "Already called setListener"); - this.listener = Preconditions.checkNotNull(listener, "listener"); + this.listener = checkNotNull(listener, "listener"); } @Override @@ -308,7 +308,7 @@ public abstract class AbstractClientStream extends AbstractStream * @param frame the received data frame. Its ownership is transferred to this method. */ protected void inboundDataReceived(ReadableBuffer frame) { - Preconditions.checkNotNull(frame, "frame"); + checkNotNull(frame, "frame"); boolean needToCloseFrame = true; try { if (statusReported) { @@ -332,8 +332,8 @@ public abstract class AbstractClientStream extends AbstractStream * @param status the status extracted from the trailers */ protected void inboundTrailersReceived(Metadata trailers, Status status) { - Preconditions.checkNotNull(status, "status"); - Preconditions.checkNotNull(trailers, "trailers"); + checkNotNull(status, "status"); + checkNotNull(trailers, "trailers"); if (statusReported) { log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}", new Object[]{status, trailers}); @@ -356,8 +356,8 @@ public abstract class AbstractClientStream extends AbstractStream */ public final void transportReportStatus(final Status status, boolean stopDelivery, final Metadata trailers) { - Preconditions.checkNotNull(status, "status"); - Preconditions.checkNotNull(trailers, "trailers"); + checkNotNull(status, "status"); + checkNotNull(trailers, "trailers"); // If stopDelivery, we continue in case previous invocation is waiting for stall if (statusReported && !stopDelivery) { return; @@ -404,8 +404,8 @@ public abstract class AbstractClientStream extends AbstractStream private byte[] payload; public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) { - this.headers = Preconditions.checkNotNull(headers, "headers"); - this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); + this.headers = checkNotNull(headers, "headers"); + this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); } @Override diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 8bb9df5fa8..7f6ec15f77 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -24,7 +24,6 @@ import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.Decompressor; import java.io.InputStream; -import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; /** @@ -108,7 +107,6 @@ public abstract class AbstractStream implements Stream { private Deframer deframer; private final Object onReadyLock = new Object(); private final StatsTraceContext statsTraceCtx; - @Nullable // okhttp transports don't trace yet private final TransportTracer transportTracer; /** @@ -133,9 +131,9 @@ public abstract class AbstractStream implements Stream { protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, - @Nullable TransportTracer transportTracer) { // nullable: okhttp transports don't trace yet + TransportTracer transportTracer) { this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); - this.transportTracer = transportTracer; + this.transportTracer = checkNotNull(transportTracer, "transportTracer"); deframer = new MessageDeframer( this, Codec.Identity.NONE, @@ -234,9 +232,7 @@ public abstract class AbstractStream implements Stream { allocated = true; } notifyIfReady(); - if (transportTracer != null) { - transportTracer.reportStreamStarted(); - } + transportTracer.reportStreamStarted(); } /** diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index 6fb09f4ca2..a941b86ff6 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -88,8 +88,6 @@ public class MessageDeframer implements Closeable, Deframer { private Listener listener; private int maxInboundMessageSize; private final StatsTraceContext statsTraceCtx; - // transportTracer is nullable until it is integrated with client transports - @Nullable private final TransportTracer transportTracer; private final String debugString; private Decompressor decompressor; @@ -123,13 +121,13 @@ public class MessageDeframer implements Closeable, Deframer { Decompressor decompressor, int maxMessageSize, StatsTraceContext statsTraceCtx, - @Nullable TransportTracer transportTracer, + TransportTracer transportTracer, String debugString) { this.listener = checkNotNull(listener, "sink"); this.decompressor = checkNotNull(decompressor, "decompressor"); this.maxInboundMessageSize = maxMessageSize; this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); - this.transportTracer = transportTracer; + this.transportTracer = checkNotNull(transportTracer, "transportTracer"); this.debugString = debugString; } @@ -395,9 +393,7 @@ public class MessageDeframer implements Closeable, Deframer { currentMessageSeqNo++; statsTraceCtx.inboundMessage(currentMessageSeqNo); - if (transportTracer != null) { - transportTracer.reportMessageReceived(); - } + transportTracer.reportMessageReceived(); // Continue reading the frame body. state = State.BODY; } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java index 3dd05a6a03..41ffc0e1ce 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java @@ -38,6 +38,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ProxyParameters; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder.Resource; +import io.grpc.internal.TransportTracer; import io.grpc.okhttp.internal.Platform; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -127,6 +128,13 @@ public class OkHttpChannelBuilder extends } } + @VisibleForTesting + final OkHttpChannelBuilder setTransportTracerFactory( + TransportTracer.Factory transportTracerFactory) { + this.transportTracerFactory = transportTracerFactory; + return this; + } + /** * Override the default executor necessary for internal transport use. * @@ -310,7 +318,8 @@ public class OkHttpChannelBuilder extends boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED; return new OkHttpTransportFactory(transportExecutor, createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(), - enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls); + enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, + transportTracerFactory); } @Override @@ -376,6 +385,7 @@ public class OkHttpChannelBuilder extends static final class OkHttpTransportFactory implements ClientTransportFactory { private final Executor executor; private final boolean usingSharedExecutor; + private final TransportTracer.Factory transportTracerFactory; @Nullable private final SSLSocketFactory socketFactory; @Nullable @@ -398,7 +408,8 @@ public class OkHttpChannelBuilder extends boolean enableKeepAlive, long keepAliveTimeNanos, long keepAliveTimeoutNanos, - boolean keepAliveWithoutCalls) { + boolean keepAliveWithoutCalls, + TransportTracer.Factory transportTracerFactory) { this.socketFactory = socketFactory; this.hostnameVerifier = hostnameVerifier; this.connectionSpec = connectionSpec; @@ -409,6 +420,8 @@ public class OkHttpChannelBuilder extends this.keepAliveWithoutCalls = keepAliveWithoutCalls; usingSharedExecutor = executor == null; + this.transportTracerFactory = + Preconditions.checkNotNull(transportTracerFactory, "transportTracerFactory"); if (usingSharedExecutor) { // The executor was unspecified, using the shared executor. this.executor = SharedResourceHolder.get(SHARED_EXECUTOR); @@ -444,7 +457,8 @@ public class OkHttpChannelBuilder extends proxy == null ? null : proxy.proxyAddress, proxy == null ? null : proxy.username, proxy == null ? null : proxy.password, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracerFactory.create()); if (enableKeepAlive) { transport.enableKeepAlive( true, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls); diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index ee4a2216c5..8349fd6ad3 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -27,6 +27,7 @@ import io.grpc.Status; import io.grpc.internal.AbstractClientStream; import io.grpc.internal.Http2ClientStreamTransportState; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.Header; @@ -69,11 +70,12 @@ class OkHttpClientStream extends AbstractClientStream { int maxMessageSize, String authority, String userAgent, - StatsTraceContext statsTraceCtx) { + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { super( new OkHttpWritableBufferAllocator(), statsTraceCtx, - /*transportTracer=*/ null, + transportTracer, headers, method.isSafe()); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); @@ -152,6 +154,7 @@ class OkHttpClientStream extends AbstractClientStream { synchronized (state.lock) { state.sendBuffer(buffer, endOfStream, flush); + getTransportTracer().reportMessageSent(numMessages); } } @@ -200,7 +203,7 @@ class OkHttpClientStream extends AbstractClientStream { AsyncFrameWriter frameWriter, OutboundFlowController outboundFlow, OkHttpClientTransport transport) { - super(maxMessageSize, statsTraceCtx, /*transportTracer=*/ null); + super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer()); this.lock = checkNotNull(lock, "lock"); this.frameWriter = frameWriter; this.outboundFlow = outboundFlow; diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 579aa71c5f..2e4e6ad504 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -46,6 +46,7 @@ import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; import io.grpc.internal.SerializingExecutor; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.FrameReader; @@ -181,6 +182,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { @Nullable private final String proxyPassword; private final Runnable tooManyPingsRunnable; + @GuardedBy("lock") + private final TransportTracer transportTracer; // The following fields should only be used for test. Runnable connectingCallback; @@ -190,7 +193,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { Executor executor, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername, - @Nullable String proxyPassword, Runnable tooManyPingsRunnable) { + @Nullable String proxyPassword, Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { this.address = Preconditions.checkNotNull(address, "address"); this.defaultAuthority = authority; this.maxMessageSize = maxMessageSize; @@ -209,6 +213,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { this.proxyPassword = proxyPassword; this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); + this.transportTracer = Preconditions.checkNotNull(transportTracer); + initTransportTracer(); } /** @@ -226,7 +232,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { @Nullable Runnable connectingCallback, SettableFuture connectedFuture, int maxMessageSize, - Runnable tooManyPingsRunnable) { + Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { address = null; this.maxMessageSize = maxMessageSize; defaultAuthority = "notarealauthority:80"; @@ -246,6 +253,23 @@ class OkHttpClientTransport implements ConnectionClientTransport { this.proxyPassword = null; this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); + this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); + initTransportTracer(); + } + + private void initTransportTracer() { + synchronized (lock) { // to make @GuardedBy linter happy + transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { + @Override + public TransportTracer.FlowControlWindows read() { + synchronized (lock) { + long local = -1; // okhttp does not track the local window size + long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0); + return new TransportTracer.FlowControlWindows(local, remote); + } + } + }); + } } /** @@ -286,6 +310,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { stopwatch.start(); p = ping = new Http2Ping(data, stopwatch); writePing = true; + transportTracer.reportKeepAliveSent(); } } if (writePing) { @@ -302,8 +327,18 @@ class OkHttpClientTransport implements ConnectionClientTransport { Preconditions.checkNotNull(method, "method"); Preconditions.checkNotNull(headers, "headers"); StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); - return new OkHttpClientStream(method, headers, frameWriter, OkHttpClientTransport.this, - outboundFlow, lock, maxMessageSize, defaultAuthority, userAgent, statsTraceCtx); + return new OkHttpClientStream( + method, + headers, + frameWriter, + OkHttpClientTransport.this, + outboundFlow, + lock, + maxMessageSize, + defaultAuthority, + userAgent, + statsTraceCtx, + transportTracer); } @GuardedBy("lock") @@ -858,9 +893,11 @@ class OkHttpClientTransport implements ConnectionClientTransport { @Override public Future getTransportStats() { - SettableFuture ret = SettableFuture.create(); - ret.set(null); - return ret; + synchronized (lock) { + SettableFuture ret = SettableFuture.create(); + ret.set(transportTracer.getStats()); + return ret; + } } /** diff --git a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java index 983f24ab2c..71cdfe36f1 100644 --- a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java +++ b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java @@ -23,7 +23,9 @@ import java.util.List; public final class AccessProtectedHack { public static InternalServer serverBuilderBuildTransportServer( AbstractServerImplBuilder builder, - List streamTracerFactories) { + List streamTracerFactories, + TransportTracer.Factory transportTracerFactory) { + builder.transportTracerFactory = transportTracerFactory; return builder.buildTransportServer(streamTracerFactories); } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java index 1a249c4f28..007e35f643 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java @@ -33,6 +33,7 @@ import io.grpc.Status; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.GrpcUtil; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.Header; import java.io.ByteArrayInputStream; @@ -62,6 +63,7 @@ public class OkHttpClientStreamTest { @Captor private ArgumentCaptor> headersCaptor; private final Object lock = new Object(); + private final TransportTracer transportTracer = new TransportTracer(); private MethodDescriptor methodDescriptor; private OkHttpClientStream stream; @@ -76,8 +78,18 @@ public class OkHttpClientStreamTest { .setResponseMarshaller(marshaller) .build(); - stream = new OkHttpClientStream(methodDescriptor, new Metadata(), frameWriter, transport, - flowController, lock, MAX_MESSAGE_SIZE, "localhost", "userAgent", StatsTraceContext.NOOP); + stream = new OkHttpClientStream( + methodDescriptor, + new Metadata(), + frameWriter, + transport, + flowController, + lock, + MAX_MESSAGE_SIZE, + "localhost", + "userAgent", + StatsTraceContext.NOOP, + transportTracer); } @Test @@ -134,7 +146,7 @@ public class OkHttpClientStreamTest { metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", - StatsTraceContext.NOOP); + StatsTraceContext.NOOP, transportTracer); stream.start(new BaseClientStreamListener()); stream.transportState().start(3); @@ -149,7 +161,7 @@ public class OkHttpClientStreamTest { metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", - StatsTraceContext.NOOP); + StatsTraceContext.NOOP, transportTracer); stream.start(new BaseClientStreamListener()); stream.transportState().start(3); @@ -177,7 +189,7 @@ public class OkHttpClientStreamTest { .build(); stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", - StatsTraceContext.NOOP); + StatsTraceContext.NOOP, transportTracer); stream.start(new BaseClientStreamListener()); // GET streams send headers after halfClose is called. diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 5bc0ed5d37..2ecfc774eb 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -54,6 +54,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallOptions; import io.grpc.InternalStatus; +import io.grpc.InternalTransportStats; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -65,6 +66,7 @@ import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.TransportTracer; import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.ErrorCode; @@ -92,6 +94,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLSocketFactory; import okio.Buffer; import okio.ByteString; import org.junit.After; @@ -127,9 +131,16 @@ public class OkHttpClientTransportTest { @Mock private ManagedClientTransport.Listener transportListener; + + private final SSLSocketFactory sslSocketFactory = null; + private final HostnameVerifier hostnameVerifier = null; + private final InetSocketAddress proxyAddr = null; + private final String proxyUser = null; + private final String proxyPassword = null; + private final TransportTracer transportTracer = new TransportTracer(); private OkHttpClientTransport clientTransport; private MockFrameReader frameReader; - private ExecutorService executor; + private ExecutorService executor = Executors.newCachedThreadPool(); private long nanoTime; // backs a ticker, for testing ping round-trip time measurement private SettableFuture connectedFuture; private DelayConnectedCallback delayConnectedCallback; @@ -143,7 +154,6 @@ public class OkHttpClientTransportTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - executor = Executors.newCachedThreadPool(); when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE); frameReader = new MockFrameReader(); } @@ -192,7 +202,8 @@ public class OkHttpClientTransportTest { connectingCallback, connectedFuture, maxMessageSize, - tooManyPingsRunnable); + tooManyPingsRunnable, + new TransportTracer()); clientTransport.start(transportListener); if (waitingForConnected) { connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS); @@ -203,10 +214,19 @@ public class OkHttpClientTransportTest { public void testToString() throws Exception { InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); clientTransport = new OkHttpClientTransport( - address, "hostname", null /* agent */, executor, /* sslSocketFactory */ null, - /* hostnameVerifier */null, - Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC), DEFAULT_MAX_MESSAGE_SIZE, - null, null, null, tooManyPingsRunnable); + address, + "hostname", + /*agent=*/ null, + executor, + sslSocketFactory, + hostnameVerifier, + Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC), + DEFAULT_MAX_MESSAGE_SIZE, + proxyAddr, + proxyUser, + proxyPassword, + tooManyPingsRunnable, + transportTracer); String s = clientTransport.toString(); assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport")); assertTrue("Unexpected: " + s, s.contains(address.toString())); @@ -524,6 +544,30 @@ public class OkHttpClientTransportTest { shutdownAndVerify(); } + @Test + public void transportTracer_windowSizeDefault() throws Exception { + initTransport(); + InternalTransportStats stats = clientTransport.getTransportStats().get(); + assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + // okhttp does not track local window sizes + assertEquals(-1, stats.localFlowControlWindow); + } + + @Test + public void transportTracer_windowSize_remote() throws Exception { + initTransport(); + InternalTransportStats before = clientTransport.getTransportStats().get(); + assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); + // okhttp does not track local window sizes + assertEquals(-1, before.localFlowControlWindow); + + frameHandler().windowUpdate(0, 1000); + InternalTransportStats after = clientTransport.getTransportStats().get(); + assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow); + // okhttp does not track local window sizes + assertEquals(-1, after.localFlowControlWindow); + } + @Test public void windowUpdate() throws Exception { initTransport(); @@ -1234,9 +1278,11 @@ public class OkHttpClientTransportTest { initTransport(); PingCallbackImpl callback1 = new PingCallbackImpl(); clientTransport.ping(callback1, MoreExecutors.directExecutor()); + assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); // add'l ping will be added as listener to outstanding operation PingCallbackImpl callback2 = new PingCallbackImpl(); clientTransport.ping(callback2, MoreExecutors.directExecutor()); + assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); ArgumentCaptor captor1 = ArgumentCaptor.forClass(int.class); ArgumentCaptor captor2 = ArgumentCaptor.forClass(int.class); @@ -1269,6 +1315,7 @@ public class OkHttpClientTransportTest { // now that previous ping is done, next request returns a different future callback1 = new PingCallbackImpl(); clientTransport.ping(callback1, MoreExecutors.directExecutor()); + assertEquals(2, clientTransport.getTransportStats().get().keepAlivesSent); assertEquals(0, callback1.invocationCount); shutdownAndVerify(); } @@ -1278,6 +1325,7 @@ public class OkHttpClientTransportTest { initTransport(); PingCallbackImpl callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); + assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); assertEquals(0, callback.invocationCount); clientTransport.shutdown(SHUTDOWN_REASON); @@ -1289,6 +1337,7 @@ public class OkHttpClientTransportTest { // now that handler is in terminal state, all future pings fail immediately callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); + assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); assertEquals(1, callback.invocationCount); assertTrue(callback.failureCause instanceof StatusException); assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); @@ -1300,6 +1349,7 @@ public class OkHttpClientTransportTest { initTransport(); PingCallbackImpl callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); + assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); assertEquals(0, callback.invocationCount); clientTransport.onException(new IOException()); @@ -1312,6 +1362,7 @@ public class OkHttpClientTransportTest { // now that handler is in terminal state, all future pings fail immediately callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); + assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); assertEquals(1, callback.invocationCount); assertTrue(callback.failureCause instanceof StatusException); assertEquals(Status.Code.UNAVAILABLE, @@ -1392,14 +1443,15 @@ public class OkHttpClientTransportTest { "invalid_authority", "userAgent", executor, - null, - null, + sslSocketFactory, + hostnameVerifier, ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE, - null, - null, - null, - tooManyPingsRunnable); + proxyAddr, + proxyUser, + proxyPassword, + tooManyPingsRunnable, + transportTracer); String host = clientTransport.getOverridenHost(); int port = clientTransport.getOverridenPort(); @@ -1415,14 +1467,15 @@ public class OkHttpClientTransportTest { "authority", "userAgent", executor, - null, - null, + sslSocketFactory, + hostnameVerifier, ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE, - null, - null, - null, - tooManyPingsRunnable); + proxyAddr, + proxyUser, + proxyPassword, + tooManyPingsRunnable, + new TransportTracer()); ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener); @@ -1446,14 +1499,15 @@ public class OkHttpClientTransportTest { "authority", "userAgent", executor, - null, - null, + sslSocketFactory, + hostnameVerifier, ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE, (InetSocketAddress) serverSocket.getLocalSocketAddress(), - null, - null, - tooManyPingsRunnable); + proxyUser, + proxyPassword, + tooManyPingsRunnable, + transportTracer); clientTransport.start(transportListener); Socket sock = serverSocket.accept(); @@ -1496,14 +1550,15 @@ public class OkHttpClientTransportTest { "authority", "userAgent", executor, - null, - null, + sslSocketFactory, + hostnameVerifier, ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE, (InetSocketAddress) serverSocket.getLocalSocketAddress(), - null, - null, - tooManyPingsRunnable); + proxyUser, + proxyPassword, + tooManyPingsRunnable, + transportTracer); clientTransport.start(transportListener); Socket sock = serverSocket.accept(); @@ -1545,14 +1600,15 @@ public class OkHttpClientTransportTest { "authority", "userAgent", executor, - null, - null, + sslSocketFactory, + hostnameVerifier, ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE, (InetSocketAddress) serverSocket.getLocalSocketAddress(), - null, - null, - tooManyPingsRunnable); + proxyUser, + proxyPassword, + tooManyPingsRunnable, + transportTracer); clientTransport.start(transportListener); Socket sock = serverSocket.accept(); @@ -1576,14 +1632,15 @@ public class OkHttpClientTransportTest { "authority", "userAgent", executor, - null, - null, + sslSocketFactory, + hostnameVerifier, ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE, InetSocketAddress.createUnresolved("unresolvedproxy", 80), - null, - null, - tooManyPingsRunnable); + proxyUser, + proxyPassword, + tooManyPingsRunnable, + transportTracer); clientTransport.start(transportListener); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java index 7609d0ecbd..7797cd57c4 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java @@ -19,12 +19,15 @@ package io.grpc.okhttp; import io.grpc.ServerStreamTracer; import io.grpc.internal.AccessProtectedHack; import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.FakeClock; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.AbstractTransportTest; import io.grpc.netty.NettyServerBuilder; import java.net.InetSocketAddress; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Ignore; import org.junit.Test; @@ -34,10 +37,19 @@ import org.junit.runners.JUnit4; /** Unit tests for OkHttp transport. */ @RunWith(JUnit4.class) public class OkHttpTransportTest extends AbstractTransportTest { + private final FakeClock fakeClock = new FakeClock(); + private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory( + new TransportTracer.TimeProvider() { + @Override + public long currentTimeMillis() { + return fakeClock.currentTimeMillis(); + } + }); private ClientTransportFactory clientFactory = OkHttpChannelBuilder // Although specified here, address is ignored because we never call build. .forAddress("localhost", 0) .negotiationType(NegotiationType.PLAINTEXT) + .setTransportTracerFactory(fakeClockTransportTracer) .buildTransportFactory(); @After @@ -51,7 +63,8 @@ public class OkHttpTransportTest extends AbstractTransportTest { NettyServerBuilder .forPort(0) .flowControlWindow(65 * 1024), - streamTracerFactories); + streamTracerFactories, + fakeClockTransportTracer); } @Override @@ -62,7 +75,8 @@ public class OkHttpTransportTest extends AbstractTransportTest { NettyServerBuilder .forPort(port) .flowControlWindow(65 * 1024), - streamTracerFactories); + streamTracerFactories, + fakeClockTransportTracer); } @Override @@ -80,9 +94,24 @@ public class OkHttpTransportTest extends AbstractTransportTest { null /* proxy */); } + @Override + protected void advanceClock(long offset, TimeUnit unit) { + fakeClock.forwardNanos(unit.toNanos(offset)); + } + + @Override + protected long currentTimeMillis() { + return fakeClock.currentTimeMillis(); + } + // TODO(ejona): Flaky/Broken @Test @Ignore @Override public void flowControlPushBack() {} + + @Override + protected boolean haveTransportTracer() { + return true; + } }