diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 49e170b4fe..4d8b3f5798 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -489,7 +489,9 @@ public final class GrpcUtil { */ public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) { if (IS_RESTRICTED_APPENGINE) { - return MoreExecutors.platformThreadFactory(); + @SuppressWarnings("BetaApi") + ThreadFactory factory = MoreExecutors.platformThreadFactory(); + return factory; } else { return new ThreadFactoryBuilder() .setDaemon(daemon) diff --git a/core/src/main/java/io/grpc/internal/IoUtils.java b/core/src/main/java/io/grpc/internal/IoUtils.java index 7801a7c518..67d903c2e6 100644 --- a/core/src/main/java/io/grpc/internal/IoUtils.java +++ b/core/src/main/java/io/grpc/internal/IoUtils.java @@ -16,27 +16,41 @@ package io.grpc.internal; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; /** Common IoUtils for thrift and nanopb to convert inputstream to bytes. */ public final class IoUtils { /** maximum buffer to be read is 16 KB. */ - private static final int MAX_BUFFER_LENGTH = 16384; - + private static final int MAX_BUFFER_LENGTH = 16384; + /** Returns the byte array. */ - public static byte[] toByteArray(InputStream is) throws IOException { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - int nRead; - byte[] bytes = new byte[MAX_BUFFER_LENGTH]; - - while ((nRead = is.read(bytes, 0, bytes.length)) != -1) { - buffer.write(bytes, 0, nRead); - } - - buffer.flush(); - return buffer.toByteArray(); + public static byte[] toByteArray(InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + copy(in, out); + return out.toByteArray(); } -} \ No newline at end of file + + /** Copies the data from input stream to output stream. */ + public static long copy(InputStream from, OutputStream to) throws IOException { + // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) + checkNotNull(from); + checkNotNull(to); + byte[] buf = new byte[MAX_BUFFER_LENGTH]; + long total = 0; + while (true) { + int r = from.read(buf); + if (r == -1) { + break; + } + to.write(buf, 0, r); + total += r; + } + return total; + } +} diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index ccb44a5c0e..665f84eeea 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.lang.Math.min; -import com.google.common.io.ByteStreams; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.Drainable; @@ -251,7 +250,7 @@ public class MessageFramer implements Framer { } else { // This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we // expect performance-critical code to support flushTo(). - long written = ByteStreams.copy(message, outputStream); + long written = IoUtils.copy(message, outputStream); checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written); return (int) written; } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 8ff60f4375..28657b9988 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -24,6 +24,7 @@ import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Attributes; import io.grpc.Codec; @@ -37,7 +38,6 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; import java.io.InputStream; -import java.util.List; import java.util.logging.Logger; final class ServerCallImpl extends ServerCall { @@ -90,9 +90,9 @@ final class ServerCallImpl extends ServerCall { } else { if (messageAcceptEncoding != null) { // TODO(carl-mastrangelo): remove the string allocation. - List acceptedEncodingsList = ACCEPT_ENCODING_SPLITTER.splitToList( - new String(messageAcceptEncoding, GrpcUtil.US_ASCII)); - if (!acceptedEncodingsList.contains(compressor.getMessageEncoding())) { + if (!Iterables.contains( + ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)), + compressor.getMessageEncoding())) { // resort to using no compression. compressor = Codec.Identity.NONE; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index afef5f2686..aea4643ed5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -22,7 +22,7 @@ import static io.netty.util.CharsetUtil.UTF_8; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; +import com.google.common.base.Supplier; import io.grpc.Attributes; import io.grpc.Metadata; import io.grpc.Status; @@ -95,14 +95,19 @@ class NettyClientHandler extends AbstractNettyHandler { private final Http2Connection.PropertyKey streamKey; private final ClientTransportLifecycleManager lifecycleManager; private final KeepAliveManager keepAliveManager; - private final Ticker ticker; + // Returns new unstarted stopwatches + private final Supplier stopwatchFactory; private WriteQueue clientWriteQueue; private Http2Ping ping; private Attributes attributes = Attributes.EMPTY; - static NettyClientHandler newHandler(ClientTransportLifecycleManager lifecycleManager, - @Nullable KeepAliveManager keepAliveManager, int flowControlWindow, int maxHeaderListSize, - Ticker ticker, Runnable tooManyPingsRunnable) { + static NettyClientHandler newHandler( + ClientTransportLifecycleManager lifecycleManager, + @Nullable KeepAliveManager keepAliveManager, + int flowControlWindow, + int maxHeaderListSize, + Supplier stopwatchFactory, + Runnable tooManyPingsRunnable) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -114,21 +119,35 @@ class NettyClientHandler extends AbstractNettyHandler { new DefaultHttp2RemoteFlowController(connection, dist); connection.remote().flowController(controller); - return newHandler(connection, frameReader, frameWriter, lifecycleManager, keepAliveManager, - flowControlWindow, maxHeaderListSize, ticker, tooManyPingsRunnable); + return newHandler( + connection, + frameReader, + frameWriter, + lifecycleManager, + keepAliveManager, + flowControlWindow, + maxHeaderListSize, + stopwatchFactory, + tooManyPingsRunnable); } @VisibleForTesting - static NettyClientHandler newHandler(Http2Connection connection, Http2FrameReader frameReader, - Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, - KeepAliveManager keepAliveManager, int flowControlWindow, int maxHeaderListSize, - Ticker ticker, Runnable tooManyPingsRunnable) { + static NettyClientHandler newHandler( + Http2Connection connection, + Http2FrameReader frameReader, + Http2FrameWriter frameWriter, + ClientTransportLifecycleManager lifecycleManager, + KeepAliveManager keepAliveManager, + int flowControlWindow, + int maxHeaderListSize, + Supplier stopwatchFactory, + Runnable tooManyPingsRunnable) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); - Preconditions.checkNotNull(ticker, "ticker"); + Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory"); Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class); @@ -151,17 +170,28 @@ class NettyClientHandler extends AbstractNettyHandler { settings.maxConcurrentStreams(0); settings.maxHeaderListSize(maxHeaderListSize); - return new NettyClientHandler(decoder, encoder, settings, lifecycleManager, keepAliveManager, - ticker, tooManyPingsRunnable); + return new NettyClientHandler( + decoder, + encoder, + settings, + lifecycleManager, + keepAliveManager, + stopwatchFactory, + tooManyPingsRunnable); } - private NettyClientHandler(Http2ConnectionDecoder decoder, StreamBufferingEncoder encoder, - Http2Settings settings, ClientTransportLifecycleManager lifecycleManager, - KeepAliveManager keepAliveManager, Ticker ticker, final Runnable tooManyPingsRunnable) { + private NettyClientHandler( + Http2ConnectionDecoder decoder, + StreamBufferingEncoder encoder, + Http2Settings settings, + ClientTransportLifecycleManager lifecycleManager, + KeepAliveManager keepAliveManager, + Supplier stopwatchFactory, + final Runnable tooManyPingsRunnable) { super(decoder, encoder, settings); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; - this.ticker = ticker; + this.stopwatchFactory = stopwatchFactory; // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); @@ -509,7 +539,8 @@ class NettyClientHandler extends AbstractNettyHandler { long data = USER_PING_PAYLOAD; ByteBuf buffer = ctx.alloc().buffer(8); buffer.writeLong(data); - Stopwatch stopwatch = Stopwatch.createStarted(ticker); + Stopwatch stopwatch = stopwatchFactory.get(); + stopwatch.start(); ping = new Http2Ping(data, stopwatch); ping.addCallback(callback, executor); // and then write the ping diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 189b06a482..3b02bce5ca 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -21,7 +21,6 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Metadata; @@ -167,8 +166,13 @@ class NettyClientTransport implements ConnectionClientTransport { keepAliveWithoutCalls); } - handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow, - maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable); + handler = NettyClientHandler.newHandler( + lifecycleManager, + keepAliveManager, + flowControlWindow, + maxHeaderListSize, + GrpcUtil.STOPWATCH_SUPPLIER, + tooManyPingsRunnable); NettyHandlerSettings.setAutoWindow(handler); negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index b5082b00a4..66b9889e31 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -42,6 +42,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; @@ -633,15 +635,27 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase stopwatchSupplier = new Supplier() { + @Override + public Stopwatch get() { + return Stopwatch.createUnstarted(ticker); + } + }; + return NettyClientHandler.newHandler( + connection, + frameReader(), + frameWriter(), + lifecycleManager, + mockKeepAliveManager, + flowControlWindow, + maxHeaderListSize, + stopwatchSupplier, tooManyPingsRunnable); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 7c9920cb93..0e71faa180 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -22,7 +22,7 @@ import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; +import com.google.common.base.Supplier; import com.google.common.util.concurrent.SettableFuture; import com.squareup.okhttp.Credentials; import com.squareup.okhttp.HttpUrl; @@ -124,7 +124,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { private final String defaultAuthority; private final String userAgent; private final Random random = new Random(); - private final Ticker ticker; + // Returns new unstarted stopwatches + private final Supplier stopwatchFactory; private Listener listener; private FrameReader testFrameReader; private AsyncFrameWriter frameWriter; @@ -199,7 +200,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { this.sslSocketFactory = sslSocketFactory; this.hostnameVerifier = hostnameVerifier; this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec"); - this.ticker = Ticker.systemTicker(); + this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER; this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); this.proxyAddress = proxyAddress; this.proxyUsername = proxyUsername; @@ -212,10 +213,18 @@ class OkHttpClientTransport implements ConnectionClientTransport { * Create a transport connected to a fake peer for test. */ @VisibleForTesting - OkHttpClientTransport(String userAgent, Executor executor, FrameReader frameReader, - FrameWriter testFrameWriter, int nextStreamId, Socket socket, Ticker ticker, - @Nullable Runnable connectingCallback, SettableFuture connectedFuture, - int maxMessageSize, Runnable tooManyPingsRunnable) { + OkHttpClientTransport( + String userAgent, + Executor executor, + FrameReader frameReader, + FrameWriter testFrameWriter, + int nextStreamId, + Socket socket, + Supplier stopwatchFactory, + @Nullable Runnable connectingCallback, + SettableFuture connectedFuture, + int maxMessageSize, + Runnable tooManyPingsRunnable) { address = null; this.maxMessageSize = maxMessageSize; defaultAuthority = "notarealauthority:80"; @@ -226,7 +235,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter"); this.socket = Preconditions.checkNotNull(socket, "socket"); this.nextStreamId = nextStreamId; - this.ticker = ticker; + this.stopwatchFactory = stopwatchFactory; this.connectionSpec = null; this.connectingCallback = connectingCallback; this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture"); @@ -271,7 +280,9 @@ class OkHttpClientTransport implements ConnectionClientTransport { } else { // set outstanding operation and then write the ping after releasing lock data = random.nextLong(); - p = ping = new Http2Ping(data, Stopwatch.createStarted(ticker)); + Stopwatch stopwatch = stopwatchFactory.get(); + stopwatch.start(); + p = ping = new Http2Ping(data, stopwatch); writePing = true; } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index df37ec3549..ff79d8caa3 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -45,6 +45,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; @@ -168,15 +170,30 @@ public class OkHttpClientTransportTest { private void startTransport(int startId, @Nullable Runnable connectingCallback, boolean waitingForConnected, int maxMessageSize, String userAgent) throws Exception { connectedFuture = SettableFuture.create(); - Ticker ticker = new Ticker() { + final Ticker ticker = new Ticker() { @Override public long read() { return nanoTime; } }; - clientTransport = new OkHttpClientTransport(userAgent, executor, frameReader, - frameWriter, startId, new MockSocket(frameReader), ticker, connectingCallback, - connectedFuture, maxMessageSize, tooManyPingsRunnable); + Supplier stopwatchSupplier = new Supplier() { + @Override + public Stopwatch get() { + return Stopwatch.createUnstarted(ticker); + } + }; + clientTransport = new OkHttpClientTransport( + userAgent, + executor, + frameReader, + frameWriter, + startId, + new MockSocket(frameReader), + stopwatchSupplier, + connectingCallback, + connectedFuture, + maxMessageSize, + tooManyPingsRunnable); clientTransport.start(transportListener); if (waitingForConnected) { connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS); diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java index 47263bbbca..e5743627e3 100644 --- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java +++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java @@ -16,7 +16,6 @@ package io.grpc.protobuf.lite; -import com.google.common.io.ByteStreams; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; @@ -53,7 +52,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength { message.writeTo(target); message = null; } else if (partial != null) { - written = (int) ByteStreams.copy(partial, target); + written = (int) ProtoLiteUtils.copy(partial, target); partial = null; } else { written = 0; diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java index 2a88a3f891..fc578b2c28 100644 --- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java +++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java @@ -32,6 +32,7 @@ import io.grpc.Status; import io.grpc.internal.GrpcUtil; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.ref.Reference; import java.lang.ref.WeakReference; @@ -44,6 +45,8 @@ public class ProtoLiteUtils { private static volatile ExtensionRegistryLite globalRegistry = ExtensionRegistryLite.getEmptyRegistry(); + private static final int BUF_SIZE = 8192; + /** * Sets the global registry for proto marshalling shared across all servers and clients. * @@ -202,6 +205,24 @@ public class ProtoLiteUtils { }; } + /** Copies the data from input stream to output stream. */ + static long copy(InputStream from, OutputStream to) throws IOException { + // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) + checkNotNull(from); + checkNotNull(to); + byte[] buf = new byte[BUF_SIZE]; + long total = 0; + while (true) { + int r = from.read(buf); + if (r == -1) { + break; + } + to.write(buf, 0, r); + total += r; + } + return total; + } + private ProtoLiteUtils() { } }