cronet: pass TransportTracer instead of null. (#3869)

AbstractStream now requires TransportTracer to be non-null.
This commit is contained in:
Kun Zhang 2017-12-14 15:43:39 -08:00 committed by GitHub
parent 02f56b0218
commit aeaced1c5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 18 deletions

View File

@ -31,6 +31,7 @@ import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ProxyParameters; import io.grpc.internal.ProxyParameters;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TransportTracer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -130,7 +131,7 @@ public final class CronetChannelBuilder extends
@Override @Override
protected final ClientTransportFactory buildTransportFactory() { protected final ClientTransportFactory buildTransportFactory() {
return new CronetTransportFactory(streamFactory, MoreExecutors.directExecutor(), return new CronetTransportFactory(streamFactory, MoreExecutors.directExecutor(),
maxMessageSize, alwaysUsePut); maxMessageSize, alwaysUsePut, transportTracerFactory.create());
} }
@Override @Override
@ -147,16 +148,19 @@ public final class CronetChannelBuilder extends
private final int maxMessageSize; private final int maxMessageSize;
private final boolean alwaysUsePut; private final boolean alwaysUsePut;
private final StreamBuilderFactory streamFactory; private final StreamBuilderFactory streamFactory;
private final TransportTracer transportTracer;
private CronetTransportFactory( private CronetTransportFactory(
StreamBuilderFactory streamFactory, StreamBuilderFactory streamFactory,
Executor executor, Executor executor,
int maxMessageSize, int maxMessageSize,
boolean alwaysUsePut) { boolean alwaysUsePut,
TransportTracer transportTracer) {
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.alwaysUsePut = alwaysUsePut; this.alwaysUsePut = alwaysUsePut;
this.streamFactory = streamFactory; this.streamFactory = streamFactory;
this.executor = Preconditions.checkNotNull(executor, "executor"); this.executor = Preconditions.checkNotNull(executor, "executor");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
} }
@Override @Override
@ -164,7 +168,7 @@ public final class CronetChannelBuilder extends
@Nullable String userAgent, @Nullable ProxyParameters proxy) { @Nullable String userAgent, @Nullable ProxyParameters proxy) {
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr; InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new CronetClientTransport(streamFactory, inetSocketAddr, authority, userAgent, return new CronetClientTransport(streamFactory, inetSocketAddr, authority, userAgent,
executor, maxMessageSize, alwaysUsePut); executor, maxMessageSize, alwaysUsePut, transportTracer);
} }
@Override @Override

View File

@ -38,6 +38,7 @@ import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.ReadableBuffers; import io.grpc.internal.ReadableBuffers;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportFrameUtil; import io.grpc.internal.TransportFrameUtil;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBuffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -90,8 +91,11 @@ class CronetClientStream extends AbstractClientStream {
boolean alwaysUsePut, boolean alwaysUsePut,
MethodDescriptor<?, ?> method, MethodDescriptor<?, ?> method,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
CallOptions callOptions) { CallOptions callOptions,
super(new CronetWritableBufferAllocator(), statsTraceCtx, null, headers, method.isSafe()); TransportTracer transportTracer) {
super(
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers,
method.isSafe());
this.url = Preconditions.checkNotNull(url, "url"); this.url = Preconditions.checkNotNull(url, "url");
this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent"); this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");
this.executor = Preconditions.checkNotNull(executor, "executor"); this.executor = Preconditions.checkNotNull(executor, "executor");
@ -103,7 +107,7 @@ class CronetClientStream extends AbstractClientStream {
this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY); this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY); this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY);
this.annotations = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATIONS_KEY); this.annotations = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATIONS_KEY);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock); this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer);
} }
@Override @Override
@ -218,8 +222,10 @@ class CronetClientStream extends AbstractClientStream {
@GuardedBy("lock") @GuardedBy("lock")
private boolean readClosed; private boolean readClosed;
public TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock) { public TransportState(
super(maxMessageSize, statsTraceCtx, null); int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
this.lock = Preconditions.checkNotNull(lock, "lock"); this.lock = Preconditions.checkNotNull(lock, "lock");
} }

View File

@ -31,6 +31,7 @@ import io.grpc.InternalTransportStats;
import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -56,6 +57,7 @@ class CronetClientTransport implements ConnectionClientTransport, InternalWithLo
private final Executor executor; private final Executor executor;
private final int maxMessageSize; private final int maxMessageSize;
private final boolean alwaysUsePut; private final boolean alwaysUsePut;
private final TransportTracer transportTracer;
// Indicates the transport is in go-away state: no new streams will be processed, // Indicates the transport is in go-away state: no new streams will be processed,
// but existing streams may continue. // but existing streams may continue.
@GuardedBy("lock") @GuardedBy("lock")
@ -80,7 +82,8 @@ class CronetClientTransport implements ConnectionClientTransport, InternalWithLo
@Nullable String userAgent, @Nullable String userAgent,
Executor executor, Executor executor,
int maxMessageSize, int maxMessageSize,
boolean alwaysUsePut) { boolean alwaysUsePut,
TransportTracer transportTracer) {
this.address = Preconditions.checkNotNull(address, "address"); this.address = Preconditions.checkNotNull(address, "address");
this.authority = authority; this.authority = authority;
this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent); this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent);
@ -88,6 +91,7 @@ class CronetClientTransport implements ConnectionClientTransport, InternalWithLo
this.alwaysUsePut = alwaysUsePut; this.alwaysUsePut = alwaysUsePut;
this.executor = Preconditions.checkNotNull(executor, "executor"); this.executor = Preconditions.checkNotNull(executor, "executor");
this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory"); this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
} }
@Override @Override
@ -111,7 +115,7 @@ class CronetClientTransport implements ConnectionClientTransport, InternalWithLo
class StartCallback implements Runnable { class StartCallback implements Runnable {
final CronetClientStream clientStream = new CronetClientStream( final CronetClientStream clientStream = new CronetClientStream(
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize, url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
alwaysUsePut, method, statsTraceCtx, callOptions); alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer);
@Override @Override
public void run() { public void run() {

View File

@ -38,6 +38,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener.MessageProducer; import io.grpc.internal.StreamListener.MessageProducer;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBuffer;
import io.grpc.testing.TestMethodDescriptors; import io.grpc.testing.TestMethodDescriptors;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -72,6 +73,7 @@ public final class CronetClientStreamTest {
@Mock private ClientStreamListener clientListener; @Mock private ClientStreamListener clientListener;
@Mock private ExperimentalBidirectionalStream.Builder builder; @Mock private ExperimentalBidirectionalStream.Builder builder;
private final Object lock = new Object(); private final Object lock = new Object();
private final TransportTracer transportTracer = TransportTracer.getDefaultFactory().create();
CronetClientStream clientStream; CronetClientStream clientStream;
private MethodDescriptor.Marshaller<Void> marshaller = TestMethodDescriptors.voidMarshaller(); private MethodDescriptor.Marshaller<Void> marshaller = TestMethodDescriptors.voidMarshaller();
@ -115,7 +117,8 @@ public final class CronetClientStreamTest {
false /* alwaysUsePut */, false /* alwaysUsePut */,
method, method,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
CallOptions.DEFAULT); CallOptions.DEFAULT,
transportTracer);
callback.setStream(clientStream); callback.setStream(clientStream);
when(factory.newBidirectionalStreamBuilder( when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
@ -573,7 +576,8 @@ public final class CronetClientStreamTest {
false /* alwaysUsePut */, false /* alwaysUsePut */,
method, method,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
CallOptions.DEFAULT.withOption(CronetCallOptions.CRONET_ANNOTATION_KEY, annotation)); CallOptions.DEFAULT.withOption(CronetCallOptions.CRONET_ANNOTATION_KEY, annotation),
transportTracer);
callback.setStream(stream); callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder( when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
@ -605,7 +609,8 @@ public final class CronetClientStreamTest {
false /* alwaysUsePut */, false /* alwaysUsePut */,
method, method,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
callOptions); callOptions,
transportTracer);
callback.setStream(stream); callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder( when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
@ -642,7 +647,8 @@ public final class CronetClientStreamTest {
false /* alwaysUsePut */, false /* alwaysUsePut */,
getMethod, getMethod,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
CallOptions.DEFAULT); CallOptions.DEFAULT,
transportTracer);
callback.setStream(stream); callback.setStream(stream);
ExperimentalBidirectionalStream.Builder getBuilder = ExperimentalBidirectionalStream.Builder getBuilder =
mock(ExperimentalBidirectionalStream.Builder.class); mock(ExperimentalBidirectionalStream.Builder.class);
@ -696,7 +702,8 @@ public final class CronetClientStreamTest {
false /* alwaysUsePut */, false /* alwaysUsePut */,
idempotentMethod, idempotentMethod,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
CallOptions.DEFAULT); CallOptions.DEFAULT,
transportTracer);
callback.setStream(stream); callback.setStream(stream);
ExperimentalBidirectionalStream.Builder builder = ExperimentalBidirectionalStream.Builder builder =
mock(ExperimentalBidirectionalStream.Builder.class); mock(ExperimentalBidirectionalStream.Builder.class);
@ -725,7 +732,8 @@ public final class CronetClientStreamTest {
true /* alwaysUsePut */, true /* alwaysUsePut */,
method, method,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
CallOptions.DEFAULT); CallOptions.DEFAULT,
transportTracer);
callback.setStream(stream); callback.setStream(stream);
ExperimentalBidirectionalStream.Builder builder = ExperimentalBidirectionalStream.Builder builder =
mock(ExperimentalBidirectionalStream.Builder.class); mock(ExperimentalBidirectionalStream.Builder.class);
@ -762,7 +770,8 @@ public final class CronetClientStreamTest {
false /* alwaysUsePut */, false /* alwaysUsePut */,
method, method,
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
CallOptions.DEFAULT); CallOptions.DEFAULT,
transportTracer);
callback.setStream(stream); callback.setStream(stream);
ExperimentalBidirectionalStream.Builder builder = ExperimentalBidirectionalStream.Builder builder =
mock(ExperimentalBidirectionalStream.Builder.class); mock(ExperimentalBidirectionalStream.Builder.class);

View File

@ -30,6 +30,7 @@ import io.grpc.Status;
import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.TransportTracer;
import io.grpc.testing.TestMethodDescriptors; import io.grpc.testing.TestMethodDescriptors;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -64,7 +65,8 @@ public final class CronetClientTransportTest {
null, null,
executor, executor,
5000, 5000,
false); false,
TransportTracer.getDefaultFactory().create());
Runnable callback = transport.start(clientTransportListener); Runnable callback = transport.start(clientTransportListener);
assertTrue(callback != null); assertTrue(callback != null);
callback.run(); callback.run();