core,netty: wire TransportTracer to netty client (#3705)

This commit is contained in:
zpencer 2017-11-10 15:55:21 -08:00 committed by GitHub
parent 30fb844790
commit d0a84ae5b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 373 additions and 162 deletions

View File

@ -86,6 +86,8 @@ public abstract class AbstractClientStream extends AbstractStream
void cancel(Status status);
}
@Nullable // okhttp does not support transportTracer yet
private final TransportTracer transportTracer;
private final Framer framer;
private boolean useGet;
private Metadata headers;
@ -104,6 +106,7 @@ public abstract class AbstractClientStream extends AbstractStream
Metadata headers,
boolean useGet) {
Preconditions.checkNotNull(headers, "headers");
this.transportTracer = transportTracer;
this.useGet = useGet;
if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
@ -189,6 +192,10 @@ public abstract class AbstractClientStream extends AbstractStream
return super.isReady() && !cancelled;
}
protected TransportTracer getTransportTracer() {
return transportTracer;
}
/** This should only called from the transport thread. */
protected abstract static class TransportState extends AbstractStream.TransportState {
/** Whether listener.closed() has been called. */
@ -383,6 +390,9 @@ public abstract class AbstractClientStream extends AbstractStream
listenerClosed = true;
statsTraceCtx.streamClosed(status);
listener().closed(status, trailers);
if (getTransportTracer() != null) {
getTransportTracer().reportStreamClosed(status.isOk());
}
}
}
}

View File

@ -191,7 +191,6 @@ public abstract class AbstractServerStream extends AbstractStream
private boolean listenerClosed;
private ServerStreamListener listener;
private final StatsTraceContext statsTraceCtx;
private final TransportTracer transportTracer;
private boolean endOfStream = false;
private boolean deframerClosed = false;
@ -210,7 +209,6 @@ public abstract class AbstractServerStream extends AbstractStream
statsTraceCtx,
Preconditions.checkNotNull(transportTracer, "transportTracer"));
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = transportTracer;
}
/**
@ -225,7 +223,6 @@ public abstract class AbstractServerStream extends AbstractStream
@Override
public final void onStreamAllocated() {
super.onStreamAllocated();
transportTracer.reportStreamStarted();
}
@Override
@ -336,9 +333,9 @@ public abstract class AbstractServerStream extends AbstractStream
if (!listenerClosed) {
if (!newStatus.isOk()) {
statsTraceCtx.streamClosed(newStatus);
transportTracer.reportStreamClosed(false);
getTransportTracer().reportStreamClosed(false);
} else {
transportTracer.reportStreamClosed(closedStatus.isOk());
getTransportTracer().reportStreamClosed(closedStatus.isOk());
}
listenerClosed = true;
onStreamDeallocated();

View File

@ -108,6 +108,8 @@ public abstract class AbstractStream implements Stream {
private Deframer deframer;
private final Object onReadyLock = new Object();
private final StatsTraceContext statsTraceCtx;
@Nullable // okhttp transports don't trace yet
private final TransportTracer transportTracer;
/**
* The number of bytes currently queued, waiting to be sent. When this falls below
@ -131,8 +133,9 @@ public abstract class AbstractStream implements Stream {
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
@Nullable TransportTracer transportTracer) { // nullable: client streams don't trace yet
@Nullable TransportTracer transportTracer) { // nullable: okhttp transports don't trace yet
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = transportTracer;
deframer = new MessageDeframer(
this,
Codec.Identity.NONE,
@ -231,6 +234,9 @@ public abstract class AbstractStream implements Stream {
allocated = true;
}
notifyIfReady();
if (transportTracer != null) {
transportTracer.reportStreamStarted();
}
}
/**
@ -281,6 +287,10 @@ public abstract class AbstractStream implements Stream {
}
}
protected TransportTracer getTransportTracer() {
return transportTracer;
}
private void notifyIfReady() {
boolean doNotify;
synchronized (onReadyLock) {

View File

@ -25,7 +25,6 @@ import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
@ -681,11 +680,10 @@ public final class GrpcUtil {
transport.ping(callback, executor);
}
@Nullable
@Override
public Future<TransportTracer.Stats> getTransportStats() {
SettableFuture<TransportTracer.Stats> ret = SettableFuture.create();
ret.set(null);
return ret;
return transport.getTransportStats();
}
};
}

View File

@ -31,6 +31,7 @@ import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@ -52,6 +53,7 @@ import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FlowController;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
@ -97,6 +99,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private final KeepAliveManager keepAliveManager;
// Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory;
private final TransportTracer transportTracer;
private WriteQueue clientWriteQueue;
private Http2Ping ping;
private Attributes attributes = Attributes.EMPTY;
@ -107,7 +110,8 @@ class NettyClientHandler extends AbstractNettyHandler {
int flowControlWindow,
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable) {
Runnable tooManyPingsRunnable,
TransportTracer transportTracer) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@ -128,12 +132,13 @@ class NettyClientHandler extends AbstractNettyHandler {
flowControlWindow,
maxHeaderListSize,
stopwatchFactory,
tooManyPingsRunnable);
tooManyPingsRunnable,
transportTracer);
}
@VisibleForTesting
static NettyClientHandler newHandler(
Http2Connection connection,
final Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
ClientTransportLifecycleManager lifecycleManager,
@ -141,7 +146,8 @@ class NettyClientHandler extends AbstractNettyHandler {
int flowControlWindow,
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable) {
Runnable tooManyPingsRunnable,
TransportTracer transportTracer) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
@ -164,6 +170,18 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader);
transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
final Http2FlowController local = connection.local().flowController();
final Http2FlowController remote = connection.remote().flowController();
@Override
public TransportTracer.FlowControlWindows read() {
return new TransportTracer.FlowControlWindows(
local.windowSize(connection.connectionStream()),
remote.windowSize(connection.connectionStream()));
}
});
Http2Settings settings = new Http2Settings();
settings.pushEnabled(false);
settings.initialWindowSize(flowControlWindow);
@ -177,7 +195,8 @@ class NettyClientHandler extends AbstractNettyHandler {
lifecycleManager,
keepAliveManager,
stopwatchFactory,
tooManyPingsRunnable);
tooManyPingsRunnable,
transportTracer);
}
private NettyClientHandler(
@ -187,11 +206,13 @@ class NettyClientHandler extends AbstractNettyHandler {
ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager,
Supplier<Stopwatch> stopwatchFactory,
final Runnable tooManyPingsRunnable) {
final Runnable tooManyPingsRunnable,
TransportTracer transportTracer) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
this.transportTracer = Preconditions.checkNotNull(transportTracer);
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@ -550,7 +571,9 @@ class NettyClientHandler extends AbstractNettyHandler {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if (future.isSuccess()) {
transportTracer.reportKeepAliveSent();
} else {
Throwable cause = future.cause();
if (cause instanceof ClosedChannelException) {
cause = lifecycleManager.getShutdownThrowable();

View File

@ -32,6 +32,7 @@ import io.grpc.Status;
import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
@ -61,12 +62,19 @@ class NettyClientStream extends AbstractClientStream {
private final AsciiString userAgent;
NettyClientStream(
TransportState state, MethodDescriptor<?, ?> method, Metadata headers,
Channel channel, AsciiString authority, AsciiString scheme, AsciiString userAgent,
StatsTraceContext statsTraceCtx) {
super(new NettyWritableBufferAllocator(channel.alloc()),
TransportState state,
MethodDescriptor<?, ?> method,
Metadata headers,
Channel channel,
AsciiString authority,
AsciiString scheme,
AsciiString userAgent,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(
new NettyWritableBufferAllocator(channel.alloc()),
statsTraceCtx,
null,
transportTracer,
headers,
useGet(method));
this.state = checkNotNull(state, "transportState");
@ -149,7 +157,7 @@ class NettyClientStream extends AbstractClientStream {
@Override
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
@ -167,6 +175,7 @@ class NettyClientStream extends AbstractClientStream {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
}
}
}), flush);
@ -205,9 +214,13 @@ class NettyClientStream extends AbstractClientStream {
private int id;
private Http2Stream http2Stream;
public TransportState(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize,
StatsTraceContext statsTraceCtx) {
super(maxMessageSize, statsTraceCtx, /*transportTracer=*/null);
public TransportState(
NettyClientHandler handler,
EventLoop eventLoop,
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(eventLoop, "eventLoop");
}

View File

@ -50,6 +50,7 @@ import io.netty.util.AsciiString;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
@ -84,6 +85,8 @@ class NettyClientTransport implements ConnectionClientTransport {
private Status statusExplainingWhyTheChannelIsNull;
/** Since not thread-safe, may only be used from event loop. */
private ClientTransportLifecycleManager lifecycleManager;
/** Since not thread-safe, may only be used from event loop. */
private final TransportTracer transportTracer = new TransportTracer();
NettyClientTransport(
SocketAddress address, Class<? extends Channel> channelType,
@ -146,15 +149,25 @@ class NettyClientTransport implements ConnectionClientTransport {
}
StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
return new NettyClientStream(
new NettyClientStream.TransportState(handler, channel.eventLoop(), maxMessageSize,
statsTraceCtx) {
new NettyClientStream.TransportState(
handler,
channel.eventLoop(),
maxMessageSize,
statsTraceCtx,
transportTracer) {
@Override
protected Status statusFromFailedFuture(ChannelFuture f) {
return NettyClientTransport.this.statusFromFailedFuture(f);
}
},
method, headers, channel, authority, negotiationHandler.scheme(), userAgent,
statsTraceCtx);
method,
headers,
channel,
authority,
negotiationHandler.scheme(),
userAgent,
statsTraceCtx,
transportTracer);
}
@SuppressWarnings("unchecked")
@ -175,7 +188,8 @@ class NettyClientTransport implements ConnectionClientTransport {
flowControlWindow,
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
tooManyPingsRunnable);
tooManyPingsRunnable,
transportTracer);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
@ -295,9 +309,20 @@ class NettyClientTransport implements ConnectionClientTransport {
@Override
public Future<TransportTracer.Stats> getTransportStats() {
SettableFuture<TransportTracer.Stats> ret = SettableFuture.create();
ret.set(null);
return ret;
if (channel.eventLoop().inEventLoop()) {
// This is necessary, otherwise we will block forever if we get the future from inside
// the event loop.
SettableFuture<TransportTracer.Stats> result = SettableFuture.create();
result.set(transportTracer.getStats());
return result;
}
return channel.eventLoop().submit(
new Callable<TransportTracer.Stats>() {
@Override
public TransportTracer.Stats call() throws Exception {
return transportTracer.getStats();
}
});
}
@VisibleForTesting

View File

@ -26,7 +26,6 @@ import static io.grpc.netty.Utils.STATUS_OK;
import static io.grpc.netty.Utils.TE_HEADER;
import static io.grpc.netty.Utils.TE_TRAILERS;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -59,6 +58,7 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@ -104,7 +104,6 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
private NettyClientStream.TransportState streamTransportState;
private Http2Headers grpcHeaders;
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
private int flowControlWindow = DEFAULT_WINDOW_SIZE;
private int maxHeaderListSize = Integer.MAX_VALUE;
private int streamId = 3;
private ClientTransportLifecycleManager lifecycleManager;
@ -125,6 +124,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
private final Queue<InputStream> streamListenerMessageQueue = new LinkedList<InputStream>();
@Override
protected void manualSetUp() throws Exception {
setUp();
}
/**
* Set up for test.
*/
@ -156,8 +160,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
}
initChannel(new GrpcHttp2ClientHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE);
streamTransportState = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState.setListener(streamListener);
grpcHeaders = new DefaultHttp2Headers()
@ -462,14 +469,20 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(3, streamTransportState.id());
streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE);
streamTransportState = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState.setListener(streamListener);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(5, streamTransportState.id());
streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE);
streamTransportState = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState.setListener(streamListener);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(7, streamTransportState.id());
@ -501,10 +514,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void ping() throws Exception {
PingCallbackImpl callback1 = new PingCallbackImpl();
assertEquals(0, transportTracer.getStats().keepAlivesSent);
sendPing(callback1);
assertEquals(1, transportTracer.getStats().keepAlivesSent);
// add'l ping will be added as listener to outstanding operation
PingCallbackImpl callback2 = new PingCallbackImpl();
sendPing(callback2);
assertEquals(1, transportTracer.getStats().keepAlivesSent);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(),
@ -534,7 +550,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
// now that previous ping is done, next request starts a new operation
callback1 = new PingCallbackImpl();
assertEquals(1, transportTracer.getStats().keepAlivesSent);
sendPing(callback1);
assertEquals(2, transportTracer.getStats().keepAlivesSent);
assertEquals(0, callback1.invocationCount);
}
@ -550,6 +568,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
// A failed ping is still counted
assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
@Test
@ -558,7 +578,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
handler().setAutoTuneFlowControl(true);
PingCallbackImpl callback = new PingCallbackImpl();
assertEquals(0, transportTracer.getStats().keepAlivesSent);
sendPing(callback);
assertEquals(1, transportTracer.getStats().keepAlivesSent);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), any(ChannelPromise.class));
ByteBuf payload = captor.getValue();
@ -575,6 +597,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
assertEquals(1, handler().flowControlPing().getPingReturn());
assertEquals(1, callback.invocationCount);
assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
@Override
@ -658,7 +681,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
flowControlWindow,
maxHeaderListSize,
stopwatchSupplier,
tooManyPingsRunnable);
tooManyPingsRunnable,
transportTracer);
}
@Override
@ -690,8 +714,12 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
}
private static class TransportStateImpl extends NettyClientStream.TransportState {
public TransportStateImpl(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize) {
super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP);
public TransportStateImpl(
NettyClientHandler handler,
EventLoop eventLoop,
int maxMessageSize,
TransportTracer transportTracer) {
super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP, transportTracer);
}
@Override

View File

@ -51,6 +51,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.WriteQueue.QueuedCommand;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -98,6 +99,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
.setResponseMarshaller(marshaller)
.build();
private final TransportTracer transportTracer = new TransportTracer();
/** Set up for test. */
@Before
@ -388,8 +390,14 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
listener = mock(ClientStreamListener.class);
stream = new NettyClientStream(new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"),
AsciiString.of("http"), AsciiString.of("agent"), StatsTraceContext.NOOP);
methodDescriptor,
new Metadata(),
channel,
AsciiString.of("localhost"),
AsciiString.of("http"),
AsciiString.of("agent"),
StatsTraceContext.NOOP,
transportTracer);
stream.start(listener);
stream().transportState().setId(STREAM_ID);
verify(listener, never()).onReady();
@ -407,9 +415,16 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
Mockito.reset(writeQueue);
when(writeQueue.enqueue(any(QueuedCommand.class), any(boolean.class))).thenReturn(future);
stream = new NettyClientStream(new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"),
AsciiString.of("http"), AsciiString.of("good agent"), StatsTraceContext.NOOP);
stream = new NettyClientStream(
new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
methodDescriptor,
new Metadata(),
channel,
AsciiString.of("localhost"),
AsciiString.of("http"),
AsciiString.of("good agent"),
StatsTraceContext.NOOP,
transportTracer);
stream.start(listener);
ArgumentCaptor<CreateStreamCommand> cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class);
@ -430,9 +445,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
.setSafe(true)
.build();
NettyClientStream stream = new NettyClientStream(
new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), descriptor, new Metadata(),
channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("agent"),
StatsTraceContext.NOOP);
new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
descriptor,
new Metadata(),
channel,
AsciiString.of("localhost"),
AsciiString.of("http"),
AsciiString.of("agent"),
StatsTraceContext.NOOP,
transportTracer);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);
@ -468,9 +489,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
}).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future);
NettyClientStream stream = new NettyClientStream(
new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), methodDescriptor, new Metadata(),
channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("agent"),
StatsTraceContext.NOOP);
new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
methodDescriptor,
new Metadata(),
channel,
AsciiString.of("localhost"),
AsciiString.of("http"),
AsciiString.of("agent"),
StatsTraceContext.NOOP,
transportTracer);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);
@ -508,7 +535,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
private class TransportStateImpl extends NettyClientStream.TransportState {
public TransportStateImpl(NettyClientHandler handler, int maxMessageSize) {
super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP);
super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP, transportTracer);
}
@Override

View File

@ -17,6 +17,7 @@
package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.any;
@ -48,6 +49,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
@ -97,6 +99,9 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
*/
protected void manualSetUp() throws Exception {}
protected final TransportTracer transportTracer = new TransportTracer();
protected int flowControlWindow = DEFAULT_WINDOW_SIZE;
private final FakeClock fakeClock = new FakeClock();
FakeClock fakeClock() {
@ -451,4 +456,53 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream));
}
@Test
public void transportTracer_windowSizeDefault() throws Exception {
manualSetUp();
TransportTracer.Stats stats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
assertEquals(flowControlWindow, stats.localFlowControlWindow);
}
@Test
public void transportTracer_windowSize() throws Exception {
flowControlWindow = 1024 * 1024;
manualSetUp();
TransportTracer.Stats stats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
assertEquals(flowControlWindow, stats.localFlowControlWindow);
}
@Test
public void transportTracer_windowUpdate_remote() throws Exception {
manualSetUp();
TransportTracer.Stats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.localFlowControlWindow);
ByteBuf serializedSettings = windowUpdate(0, 1000);
channelRead(serializedSettings);
TransportTracer.Stats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
after.remoteFlowControlWindow);
assertEquals(flowControlWindow, after.localFlowControlWindow);
}
@Test
public void transportTracer_windowUpdate_local() throws Exception {
manualSetUp();
TransportTracer.Stats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(flowControlWindow, before.localFlowControlWindow);
// If the window size is below a certain threshold, netty will wait to apply the update.
// Use a large increment to be sure that it exceeds the threshold.
connection().local().flowController().incrementWindowSize(
connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE);
TransportTracer.Stats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow);
assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE,
connection().local().flowController().windowSize(connection().connectionStream()));
}
}

View File

@ -29,7 +29,6 @@ import static io.grpc.netty.Utils.HTTP_METHOD;
import static io.grpc.netty.Utils.TE_HEADER;
import static io.grpc.netty.Utils.TE_TRAILERS;
import static io.netty.buffer.Unpooled.directBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -66,7 +65,6 @@ import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
@ -126,7 +124,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
final Queue<InputStream> streamListenerMessageQueue = new LinkedList<InputStream>();
private int flowControlWindow = DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = Integer.MAX_VALUE;
private int maxHeaderListSize = Integer.MAX_VALUE;
private boolean permitKeepAliveWithoutCalls = true;
@ -136,7 +133,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
private TransportTracer transportTracer = new TransportTracer();
private class ServerTransportListenerImpl implements ServerTransportListener {
@ -766,48 +762,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
assertTrue(!channel().isOpen());
}
@Test
public void transportTracer_windowSizeDefault() throws Exception {
manualSetUp();
TransportTracer.Stats stats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
assertEquals(flowControlWindow, stats.localFlowControlWindow);
}
@Test
public void transportTracer_windowUpdate_remote() throws Exception {
flowControlWindow = 1048576; // 1MiB
manualSetUp();
TransportTracer.Stats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(flowControlWindow, before.localFlowControlWindow);
ByteBuf serializedSettings = windowUpdate(0, 1000);
channelRead(serializedSettings);
TransportTracer.Stats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
after.remoteFlowControlWindow);
assertEquals(flowControlWindow, after.localFlowControlWindow);
}
@Test
public void transportTracer_windowUpdate_local() throws Exception {
manualSetUp();
TransportTracer.Stats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(flowControlWindow, before.localFlowControlWindow);
// If the window size is below a certain threshold, netty will wait to apply the update.
// Use a large increment to be sure that it exceeds the threshold.
connection().local().flowController().incrementWindowSize(
connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE);
TransportTracer.Stats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow);
assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE,
after.localFlowControlWindow);
}
private void createStream() throws Exception {
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)

View File

@ -1380,7 +1380,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_streamStarted() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!haveTransportTracer()) {
@ -1388,11 +1388,16 @@ public abstract class AbstractTransportTest {
}
// start first stream
long firstTimestamp;
long serverFirstTimestampNanos;
long clientFirstTimestampNanos;
{
TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
assertEquals(0, before.streamsStarted);
assertEquals(0, before.lastStreamCreatedTimeNanos);
TransportTracer.Stats serverBefore =
serverTransportListener.transport.getTransportStats().get();
assertEquals(0, serverBefore.streamsStarted);
assertEquals(0, serverBefore.lastStreamCreatedTimeNanos);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(0, clientBefore.streamsStarted);
assertEquals(0, clientBefore.lastStreamCreatedTimeNanos);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
@ -1400,19 +1405,35 @@ public abstract class AbstractTransportTest {
StreamCreation serverStreamCreation = serverTransportListener
.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
assertEquals(1, after.streamsStarted);
firstTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos);
assertThat(System.currentTimeMillis() - firstTimestamp).isAtMost(50L);
TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.streamsStarted);
serverFirstTimestampNanos = serverAfter.lastStreamCreatedTimeNanos;
assertThat(System.currentTimeMillis()
- TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos)).isAtMost(50L);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.streamsStarted);
clientFirstTimestampNanos = clientAfter.lastStreamCreatedTimeNanos;
assertThat(System.currentTimeMillis()
- TimeUnit.NANOSECONDS.toMillis(clientFirstTimestampNanos)).isAtMost(50L);
ServerStream serverStream = serverStreamCreation.stream;
serverStream.close(Status.OK, new Metadata());
}
// lastStreamCreatedTimeNanos is converted from the system milli clock. Sleep a bit to ensure
// it has moved forward in time.
// TODO(zpencer): plumb in a fake clock instead
Thread.sleep(5);
// start second stream
{
TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
assertEquals(1, before.streamsStarted);
TransportTracer.Stats serverBefore =
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverBefore.streamsStarted);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(1, clientBefore.streamsStarted);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
@ -1420,11 +1441,20 @@ public abstract class AbstractTransportTest {
StreamCreation serverStreamCreation = serverTransportListener
.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
assertEquals(2, after.streamsStarted);
assertTrue(after.lastStreamCreatedTimeNanos > firstTimestamp);
long secondTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos);
assertThat(System.currentTimeMillis() - secondTimestamp).isAtMost(50L);
TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get();
assertEquals(2, serverAfter.streamsStarted);
assertTrue(serverAfter.lastStreamCreatedTimeNanos > serverFirstTimestampNanos);
long serverSecondTimestamp =
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos);
assertThat(System.currentTimeMillis() - serverSecondTimestamp).isAtMost(50L);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(2, clientAfter.streamsStarted);
assertTrue(clientAfter.lastStreamCreatedTimeNanos > clientFirstTimestampNanos);
long clientSecondTimestamp =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastStreamCreatedTimeNanos);
assertThat(System.currentTimeMillis() - clientSecondTimestamp).isAtMost(50L);
ServerStream serverStream = serverStreamCreation.stream;
serverStream.close(Status.OK, new Metadata());
@ -1435,7 +1465,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_streamEnded_ok() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
runIfNotNull(client.start(mockClientTransportListener));
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1448,24 +1478,35 @@ public abstract class AbstractTransportTest {
return;
}
TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
assertEquals(0, before.streamsSucceeded);
TransportTracer.Stats serverBefore =
serverTransportListener.transport.getTransportStats().get();
assertEquals(0, serverBefore.streamsSucceeded);
assertEquals(0, serverBefore.streamsFailed);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(0, clientBefore.streamsSucceeded);
assertEquals(0, clientBefore.streamsFailed);
clientStream.halfClose();
serverStream.close(Status.OK, new Metadata());
// Block until the close actually happened before verifying stats
serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
// do not validate stats until close() has been called on client
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
assertEquals(1, after.streamsSucceeded);
assertEquals(0, after.streamsFailed);
TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.streamsSucceeded);
assertEquals(0, serverAfter.streamsFailed);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.streamsSucceeded);
assertEquals(0, clientAfter.streamsFailed);
}
@Test
public void transportTracer_server_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
runIfNotNull(client.start(mockClientTransportListener));
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1478,16 +1519,29 @@ public abstract class AbstractTransportTest {
return;
}
TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
assertEquals(0, before.streamsFailed);
TransportTracer.Stats serverBefore =
serverTransportListener.transport.getTransportStats().get();
assertEquals(0, serverBefore.streamsFailed);
assertEquals(0, serverBefore.streamsSucceeded);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(0, clientBefore.streamsFailed);
assertEquals(0, clientBefore.streamsSucceeded);
serverStream.close(Status.UNKNOWN, new Metadata());
// Block until the close actually happened before verifying stats
serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
// do not validate stats until close() has been called on client
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
assertEquals(1, after.streamsFailed);
assertEquals(0, after.streamsSucceeded);
TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.streamsFailed);
assertEquals(0, serverAfter.streamsSucceeded);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.streamsFailed);
assertEquals(0, clientAfter.streamsSucceeded);
client.shutdown(Status.UNAVAILABLE);
}
@Test
@ -1510,10 +1564,9 @@ public abstract class AbstractTransportTest {
serverTransportListener.transport.getTransportStats().get();
assertEquals(0, serverBefore.streamsFailed);
assertEquals(0, serverBefore.streamsSucceeded);
// TODO(zpencer): uncomment when integrated with client transport
// TransportTracer.Stats clientBefore = client.getTransportStats().get();
// assertEquals(0, clientBefore.streamsFailed);
// assertEquals(0, clientBefore.streamsSucceeded);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(0, clientBefore.streamsFailed);
assertEquals(0, clientBefore.streamsSucceeded);
clientStream.cancel(Status.UNKNOWN);
// do not validate stats until close() has been called on server
@ -1523,17 +1576,16 @@ public abstract class AbstractTransportTest {
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.streamsFailed);
assertEquals(0, serverAfter.streamsSucceeded);
// TODO(zpencer): uncomment when integrated with client transport
// TransportTracer.Stats clientAfter = client.getTransportStats().get();
// assertEquals(1, clientAfter.streamsFailed);
// assertEquals(0, clientAfter.streamsSucceeded);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.streamsFailed);
assertEquals(0, clientAfter.streamsSucceeded);
}
@Test
public void transportTracer_receive_msg() throws Exception {
public void transportTracer_server_receive_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
runIfNotNull(client.start(mockClientTransportListener));
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1547,9 +1599,13 @@ public abstract class AbstractTransportTest {
return;
}
TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
assertEquals(0, before.messagesReceived);
assertEquals(0, before.lastMessageReceivedTimeNanos);
TransportTracer.Stats serverBefore =
serverTransportListener.transport.getTransportStats().get();
assertEquals(0, serverBefore.messagesReceived);
assertEquals(0, serverBefore.lastMessageReceivedTimeNanos);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(0, clientBefore.messagesSent);
assertEquals(0, clientBefore.lastMessageSentTimeNanos);
serverStream.request(1);
clientStream.writeMessage(methodDescriptor.streamRequest("request"));
@ -1557,19 +1613,26 @@ public abstract class AbstractTransportTest {
clientStream.halfClose();
verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
assertEquals(1, after.messagesReceived);
long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageReceivedTimeNanos);
assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L);
TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.messagesReceived);
long serverTimestamp =
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos);
assertThat(System.currentTimeMillis() - serverTimestamp).isAtMost(50L);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.messagesSent);
long clientTimestamp =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos);
assertThat(System.currentTimeMillis() - clientTimestamp).isAtMost(50L);
serverStream.close(Status.OK, new Metadata());
}
@Test
public void transportTracer_send_msg() throws Exception {
public void transportTracer_server_send_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
runIfNotNull(client.start(mockClientTransportListener));
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1582,9 +1645,13 @@ public abstract class AbstractTransportTest {
return;
}
TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
assertEquals(0, before.messagesSent);
assertEquals(0, before.lastMessageSentTimeNanos);
TransportTracer.Stats serverBefore =
serverTransportListener.transport.getTransportStats().get();
assertEquals(0, serverBefore.messagesSent);
assertEquals(0, serverBefore.lastMessageSentTimeNanos);
TransportTracer.Stats clientBefore = client.getTransportStats().get();
assertEquals(0, clientBefore.messagesReceived);
assertEquals(0, clientBefore.lastMessageReceivedTimeNanos);
clientStream.request(1);
serverStream.writeHeaders(new Metadata());
@ -1592,11 +1659,16 @@ public abstract class AbstractTransportTest {
serverStream.flush();
verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
assertEquals(1, after.messagesSent);
long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageSentTimeNanos);
assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L);
TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.messagesSent);
long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos);
assertThat(System.currentTimeMillis() - serverTimestmap).isAtMost(50L);
TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.messagesReceived);
long clientTimestmap =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos);
assertThat(System.currentTimeMillis() - clientTimestmap).isAtMost(50L);
serverStream.close(Status.OK, new Metadata());
}