core,netty,okhttp,protobuf-lite: avoid @Beta guava classes (#3463)

This commit is contained in:
zpencer 2017-09-14 19:39:06 -07:00 committed by GitHub
parent bb203657cb
commit 2b1363d586
11 changed files with 174 additions and 62 deletions

View File

@ -489,7 +489,9 @@ public final class GrpcUtil {
*/
public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
if (IS_RESTRICTED_APPENGINE) {
return MoreExecutors.platformThreadFactory();
@SuppressWarnings("BetaApi")
ThreadFactory factory = MoreExecutors.platformThreadFactory();
return factory;
} else {
return new ThreadFactoryBuilder()
.setDaemon(daemon)

View File

@ -16,27 +16,41 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/** Common IoUtils for thrift and nanopb to convert inputstream to bytes. */
public final class IoUtils {
/** maximum buffer to be read is 16 KB. */
private static final int MAX_BUFFER_LENGTH = 16384;
private static final int MAX_BUFFER_LENGTH = 16384;
/** Returns the byte array. */
public static byte[] toByteArray(InputStream is) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] bytes = new byte[MAX_BUFFER_LENGTH];
while ((nRead = is.read(bytes, 0, bytes.length)) != -1) {
buffer.write(bytes, 0, nRead);
}
buffer.flush();
return buffer.toByteArray();
public static byte[] toByteArray(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
}
}
/** Copies the data from input stream to output stream. */
public static long copy(InputStream from, OutputStream to) throws IOException {
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[MAX_BUFFER_LENGTH];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
}

View File

@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import com.google.common.io.ByteStreams;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Drainable;
@ -251,7 +250,7 @@ public class MessageFramer implements Framer {
} else {
// This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we
// expect performance-critical code to support flushTo().
long written = ByteStreams.copy(message, outputStream);
long written = IoUtils.copy(message, outputStream);
checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written);
return (int) written;
}

View File

@ -24,6 +24,7 @@ import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.Codec;
@ -37,7 +38,6 @@ import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.Status;
import java.io.InputStream;
import java.util.List;
import java.util.logging.Logger;
final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@ -90,9 +90,9 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
} else {
if (messageAcceptEncoding != null) {
// TODO(carl-mastrangelo): remove the string allocation.
List<String> acceptedEncodingsList = ACCEPT_ENCODING_SPLITTER.splitToList(
new String(messageAcceptEncoding, GrpcUtil.US_ASCII));
if (!acceptedEncodingsList.contains(compressor.getMessageEncoding())) {
if (!Iterables.contains(
ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
compressor.getMessageEncoding())) {
// resort to using no compression.
compressor = Codec.Identity.NONE;
}

View File

@ -22,7 +22,7 @@ import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.base.Supplier;
import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Status;
@ -95,14 +95,19 @@ class NettyClientHandler extends AbstractNettyHandler {
private final Http2Connection.PropertyKey streamKey;
private final ClientTransportLifecycleManager lifecycleManager;
private final KeepAliveManager keepAliveManager;
private final Ticker ticker;
// Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory;
private WriteQueue clientWriteQueue;
private Http2Ping ping;
private Attributes attributes = Attributes.EMPTY;
static NettyClientHandler newHandler(ClientTransportLifecycleManager lifecycleManager,
@Nullable KeepAliveManager keepAliveManager, int flowControlWindow, int maxHeaderListSize,
Ticker ticker, Runnable tooManyPingsRunnable) {
static NettyClientHandler newHandler(
ClientTransportLifecycleManager lifecycleManager,
@Nullable KeepAliveManager keepAliveManager,
int flowControlWindow,
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@ -114,21 +119,35 @@ class NettyClientHandler extends AbstractNettyHandler {
new DefaultHttp2RemoteFlowController(connection, dist);
connection.remote().flowController(controller);
return newHandler(connection, frameReader, frameWriter, lifecycleManager, keepAliveManager,
flowControlWindow, maxHeaderListSize, ticker, tooManyPingsRunnable);
return newHandler(
connection,
frameReader,
frameWriter,
lifecycleManager,
keepAliveManager,
flowControlWindow,
maxHeaderListSize,
stopwatchFactory,
tooManyPingsRunnable);
}
@VisibleForTesting
static NettyClientHandler newHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager, int flowControlWindow, int maxHeaderListSize,
Ticker ticker, Runnable tooManyPingsRunnable) {
static NettyClientHandler newHandler(
Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager,
int flowControlWindow,
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Preconditions.checkNotNull(ticker, "ticker");
Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
@ -151,17 +170,28 @@ class NettyClientHandler extends AbstractNettyHandler {
settings.maxConcurrentStreams(0);
settings.maxHeaderListSize(maxHeaderListSize);
return new NettyClientHandler(decoder, encoder, settings, lifecycleManager, keepAliveManager,
ticker, tooManyPingsRunnable);
return new NettyClientHandler(
decoder,
encoder,
settings,
lifecycleManager,
keepAliveManager,
stopwatchFactory,
tooManyPingsRunnable);
}
private NettyClientHandler(Http2ConnectionDecoder decoder, StreamBufferingEncoder encoder,
Http2Settings settings, ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager, Ticker ticker, final Runnable tooManyPingsRunnable) {
private NettyClientHandler(
Http2ConnectionDecoder decoder,
StreamBufferingEncoder encoder,
Http2Settings settings,
ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager,
Supplier<Stopwatch> stopwatchFactory,
final Runnable tooManyPingsRunnable) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.ticker = ticker;
this.stopwatchFactory = stopwatchFactory;
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@ -509,7 +539,8 @@ class NettyClientHandler extends AbstractNettyHandler {
long data = USER_PING_PAYLOAD;
ByteBuf buffer = ctx.alloc().buffer(8);
buffer.writeLong(data);
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Stopwatch stopwatch = stopwatchFactory.get();
stopwatch.start();
ping = new Http2Ping(data, stopwatch);
ping.addCallback(callback, executor);
// and then write the ping

View File

@ -21,7 +21,6 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Metadata;
@ -167,8 +166,13 @@ class NettyClientTransport implements ConnectionClientTransport {
keepAliveWithoutCalls);
}
handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
handler = NettyClientHandler.newHandler(
lifecycleManager,
keepAliveManager,
flowControlWindow,
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
tooManyPingsRunnable);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);

View File

@ -42,6 +42,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
@ -633,15 +635,27 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
Http2Stream stream = connection.local().createStream(streamId - 2, true);
stream.close();
Ticker ticker = new Ticker() {
final Ticker ticker = new Ticker() {
@Override
public long read() {
return nanoTime;
}
};
return NettyClientHandler.newHandler(connection, frameReader(), frameWriter(),
lifecycleManager, mockKeepAliveManager, flowControlWindow, maxHeaderListSize, ticker,
Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() {
@Override
public Stopwatch get() {
return Stopwatch.createUnstarted(ticker);
}
};
return NettyClientHandler.newHandler(
connection,
frameReader(),
frameWriter(),
lifecycleManager,
mockKeepAliveManager,
flowControlWindow,
maxHeaderListSize,
stopwatchSupplier,
tooManyPingsRunnable);
}

View File

@ -22,7 +22,7 @@ import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.SettableFuture;
import com.squareup.okhttp.Credentials;
import com.squareup.okhttp.HttpUrl;
@ -124,7 +124,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
private final String defaultAuthority;
private final String userAgent;
private final Random random = new Random();
private final Ticker ticker;
// Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory;
private Listener listener;
private FrameReader testFrameReader;
private AsyncFrameWriter frameWriter;
@ -199,7 +200,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
this.sslSocketFactory = sslSocketFactory;
this.hostnameVerifier = hostnameVerifier;
this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec");
this.ticker = Ticker.systemTicker();
this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER;
this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
this.proxyAddress = proxyAddress;
this.proxyUsername = proxyUsername;
@ -212,10 +213,18 @@ class OkHttpClientTransport implements ConnectionClientTransport {
* Create a transport connected to a fake peer for test.
*/
@VisibleForTesting
OkHttpClientTransport(String userAgent, Executor executor, FrameReader frameReader,
FrameWriter testFrameWriter, int nextStreamId, Socket socket, Ticker ticker,
@Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture,
int maxMessageSize, Runnable tooManyPingsRunnable) {
OkHttpClientTransport(
String userAgent,
Executor executor,
FrameReader frameReader,
FrameWriter testFrameWriter,
int nextStreamId,
Socket socket,
Supplier<Stopwatch> stopwatchFactory,
@Nullable Runnable connectingCallback,
SettableFuture<Void> connectedFuture,
int maxMessageSize,
Runnable tooManyPingsRunnable) {
address = null;
this.maxMessageSize = maxMessageSize;
defaultAuthority = "notarealauthority:80";
@ -226,7 +235,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter");
this.socket = Preconditions.checkNotNull(socket, "socket");
this.nextStreamId = nextStreamId;
this.ticker = ticker;
this.stopwatchFactory = stopwatchFactory;
this.connectionSpec = null;
this.connectingCallback = connectingCallback;
this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
@ -271,7 +280,9 @@ class OkHttpClientTransport implements ConnectionClientTransport {
} else {
// set outstanding operation and then write the ping after releasing lock
data = random.nextLong();
p = ping = new Http2Ping(data, Stopwatch.createStarted(ticker));
Stopwatch stopwatch = stopwatchFactory.get();
stopwatch.start();
p = ping = new Http2Ping(data, stopwatch);
writePing = true;
}
}

View File

@ -45,6 +45,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
@ -168,15 +170,30 @@ public class OkHttpClientTransportTest {
private void startTransport(int startId, @Nullable Runnable connectingCallback,
boolean waitingForConnected, int maxMessageSize, String userAgent) throws Exception {
connectedFuture = SettableFuture.create();
Ticker ticker = new Ticker() {
final Ticker ticker = new Ticker() {
@Override
public long read() {
return nanoTime;
}
};
clientTransport = new OkHttpClientTransport(userAgent, executor, frameReader,
frameWriter, startId, new MockSocket(frameReader), ticker, connectingCallback,
connectedFuture, maxMessageSize, tooManyPingsRunnable);
Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() {
@Override
public Stopwatch get() {
return Stopwatch.createUnstarted(ticker);
}
};
clientTransport = new OkHttpClientTransport(
userAgent,
executor,
frameReader,
frameWriter,
startId,
new MockSocket(frameReader),
stopwatchSupplier,
connectingCallback,
connectedFuture,
maxMessageSize,
tooManyPingsRunnable);
clientTransport.start(transportListener);
if (waitingForConnected) {
connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);

View File

@ -16,7 +16,6 @@
package io.grpc.protobuf.lite;
import com.google.common.io.ByteStreams;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
@ -53,7 +52,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength {
message.writeTo(target);
message = null;
} else if (partial != null) {
written = (int) ByteStreams.copy(partial, target);
written = (int) ProtoLiteUtils.copy(partial, target);
partial = null;
} else {
written = 0;

View File

@ -32,6 +32,7 @@ import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
@ -44,6 +45,8 @@ public class ProtoLiteUtils {
private static volatile ExtensionRegistryLite globalRegistry =
ExtensionRegistryLite.getEmptyRegistry();
private static final int BUF_SIZE = 8192;
/**
* Sets the global registry for proto marshalling shared across all servers and clients.
*
@ -202,6 +205,24 @@ public class ProtoLiteUtils {
};
}
/** Copies the data from input stream to output stream. */
static long copy(InputStream from, OutputStream to) throws IOException {
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[BUF_SIZE];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
private ProtoLiteUtils() {
}
}