core,netty: plumb fake clock into transport tracer tests (#3724)

This commit is contained in:
zpencer 2017-11-13 15:58:29 -08:00 committed by GitHub
parent 51bbc1a4bd
commit 47bee4feee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 159 additions and 32 deletions

View File

@ -124,6 +124,8 @@ public abstract class AbstractManagedChannelImplBuilder
long idleTimeoutMillis = IDLE_MODE_DEFAULT_TIMEOUT_MILLIS; long idleTimeoutMillis = IDLE_MODE_DEFAULT_TIMEOUT_MILLIS;
protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
/** /**

View File

@ -107,6 +107,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
private boolean recordFinishedRpcs = true; private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true; private boolean tracingEnabled = true;
protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
@Override @Override
public final T directExecutor() { public final T directExecutor() {
return executor(MoreExecutors.directExecutor()); return executor(MoreExecutors.directExecutor());

View File

@ -16,6 +16,7 @@
package io.grpc.internal; package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -24,6 +25,15 @@ import java.util.concurrent.TimeUnit;
* Can only be called from the transport thread unless otherwise noted. * Can only be called from the transport thread unless otherwise noted.
*/ */
public final class TransportTracer { public final class TransportTracer {
private static final TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() {
@Override
public long currentTimeMillis() {
return System.currentTimeMillis();
}
};
private static final Factory DEFAULT_FACTORY = new Factory(SYSTEM_TIME_PROVIDER);
private final TimeProvider timeProvider;
private long streamsStarted; private long streamsStarted;
private long lastStreamCreatedTimeNanos; private long lastStreamCreatedTimeNanos;
private long streamsSucceeded; private long streamsSucceeded;
@ -37,6 +47,14 @@ public final class TransportTracer {
private final LongCounter messagesReceived = LongCounterFactory.create(); private final LongCounter messagesReceived = LongCounterFactory.create();
private volatile long lastMessageReceivedTimeNanos; private volatile long lastMessageReceivedTimeNanos;
public TransportTracer() {
this.timeProvider = SYSTEM_TIME_PROVIDER;
}
private TransportTracer(TimeProvider timeProvider) {
this.timeProvider = timeProvider;
}
/** /**
* Returns a read only set of current stats. * Returns a read only set of current stats.
*/ */
@ -128,8 +146,8 @@ public final class TransportTracer {
FlowControlWindows read(); FlowControlWindows read();
} }
private static long currentTimeNanos() { private long currentTimeNanos() {
return TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); return TimeUnit.MILLISECONDS.toNanos(timeProvider.currentTimeMillis());
} }
/** /**
@ -178,4 +196,31 @@ public final class TransportTracer {
} }
} }
} }
/**
* Time source representing the current system time in millis. Used to inject a fake clock
* into unit tests.
*/
@VisibleForTesting
public interface TimeProvider {
/** Returns the current milli time. */
long currentTimeMillis();
}
public static final class Factory {
private TimeProvider timeProvider;
@VisibleForTesting
public Factory(TimeProvider timeProvider) {
this.timeProvider = timeProvider;
}
public TransportTracer create() {
return new TransportTracer(timeProvider);
}
}
public static Factory getDefaultFactory() {
return DEFAULT_FACTORY;
}
} }

View File

@ -38,6 +38,7 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ProxyParameters; import io.grpc.internal.ProxyParameters;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -320,7 +321,8 @@ public final class NettyChannelBuilder
protected ClientTransportFactory buildTransportFactory() { protected ClientTransportFactory buildTransportFactory() {
return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions, return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(), negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls); maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory.create());
} }
@Override @Override
@ -408,6 +410,12 @@ public final class NettyChannelBuilder
super.setStatsEnabled(value); super.setStatsEnabled(value);
} }
@VisibleForTesting
NettyChannelBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) {
this.transportTracerFactory = transportTracerFactory;
return this;
}
interface TransportCreationParamsFilterFactory { interface TransportCreationParamsFilterFactory {
@CheckReturnValue @CheckReturnValue
TransportCreationParamsFilter create( TransportCreationParamsFilter create(
@ -445,6 +453,7 @@ public final class NettyChannelBuilder
private final AtomicBackoff keepAliveTimeNanos; private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos; private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls; private final boolean keepAliveWithoutCalls;
private final TransportTracer transportTracer;
private boolean closed; private boolean closed;
@ -452,10 +461,12 @@ public final class NettyChannelBuilder
Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions, Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions,
NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group, NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) { long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer transportTracer) {
this.channelType = channelType; this.channelType = channelType;
this.negotiationType = negotiationType; this.negotiationType = negotiationType;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions); this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.transportTracer = transportTracer;
if (transportCreationParamsFilterFactory == null) { if (transportCreationParamsFilterFactory == null) {
transportCreationParamsFilterFactory = transportCreationParamsFilterFactory =
@ -499,7 +510,7 @@ public final class NettyChannelBuilder
dparams.getProtocolNegotiator(), flowControlWindow, dparams.getProtocolNegotiator(), flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(), keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
tooManyPingsRunnable); tooManyPingsRunnable, transportTracer);
return transport; return transport;
} }

View File

@ -86,7 +86,7 @@ class NettyClientTransport implements ConnectionClientTransport {
/** Since not thread-safe, may only be used from event loop. */ /** Since not thread-safe, may only be used from event loop. */
private ClientTransportLifecycleManager lifecycleManager; private ClientTransportLifecycleManager lifecycleManager;
/** Since not thread-safe, may only be used from event loop. */ /** Since not thread-safe, may only be used from event loop. */
private final TransportTracer transportTracer = new TransportTracer(); private final TransportTracer transportTracer;
NettyClientTransport( NettyClientTransport(
SocketAddress address, Class<? extends Channel> channelType, SocketAddress address, Class<? extends Channel> channelType,
@ -94,7 +94,7 @@ class NettyClientTransport implements ConnectionClientTransport {
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable) { Runnable tooManyPingsRunnable, TransportTracer transportTracer) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.address = Preconditions.checkNotNull(address, "address"); this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group"); this.group = Preconditions.checkNotNull(group, "group");
@ -110,6 +110,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent)); this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
this.tooManyPingsRunnable = this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import io.grpc.internal.InternalServer;
import io.grpc.internal.ServerListener; import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -72,11 +73,13 @@ class NettyServer implements InternalServer {
private final long permitKeepAliveTimeInNanos; private final long permitKeepAliveTimeInNanos;
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
private final List<ServerStreamTracer.Factory> streamTracerFactories; private final List<ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
NettyServer( NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType, SocketAddress address, Class<? extends ServerChannel> channelType,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup, @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories, ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos, long maxConnectionIdleInNanos,
@ -90,6 +93,7 @@ class NettyServer implements InternalServer {
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.usingSharedBossGroup = bossGroup == null; this.usingSharedBossGroup = bossGroup == null;
this.usingSharedWorkerGroup = workerGroup == null; this.usingSharedWorkerGroup = workerGroup == null;
this.transportTracerFactory = transportTracerFactory;
this.maxStreamsPerConnection = maxStreamsPerConnection; this.maxStreamsPerConnection = maxStreamsPerConnection;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
@ -142,7 +146,8 @@ class NettyServer implements InternalServer {
NettyServerTransport transport = NettyServerTransport transport =
new NettyServerTransport( new NettyServerTransport(
ch, protocolNegotiator, streamTracerFactories, maxStreamsPerConnection, ch, protocolNegotiator, streamTracerFactories, transportTracerFactory.create(),
maxStreamsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize, flowControlWindow, maxMessageSize, maxHeaderListSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionIdleInNanos,

View File

@ -22,6 +22,7 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
@ -30,6 +31,7 @@ import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -204,6 +206,12 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
super.setStatsEnabled(value); super.setStatsEnabled(value);
} }
@VisibleForTesting
NettyServerBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) {
this.transportTracerFactory = transportTracerFactory;
return this;
}
/** /**
* The maximum number of concurrent calls permitted for each incoming connection. Defaults to no * The maximum number of concurrent calls permitted for each incoming connection. Defaults to no
* limit. * limit.
@ -391,7 +399,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
return new NettyServer( return new NettyServer(
address, channelType, bossEventLoopGroup, workerEventLoopGroup, address, channelType, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, maxConcurrentCallsPerConnection, flowControlWindow, negotiator, streamTracerFactories, transportTracerFactory,
maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,

View File

@ -73,7 +73,8 @@ class NettyServerTransport implements ServerTransport {
NettyServerTransport( NettyServerTransport(
Channel channel, ProtocolNegotiator protocolNegotiator, Channel channel, ProtocolNegotiator protocolNegotiator,
List<ServerStreamTracer.Factory> streamTracerFactories, int maxStreams, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, int maxStreams,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos, long maxConnectionIdleInNanos,
@ -83,6 +84,7 @@ class NettyServerTransport implements ServerTransport {
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = this.streamTracerFactories =
Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.maxStreams = maxStreams; this.maxStreams = maxStreams;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
@ -94,7 +96,6 @@ class NettyServerTransport implements ServerTransport {
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.transportTracer = new TransportTracer();
} }
public void start(ServerTransportListener listener) { public void start(ServerTransportListener listener) {

View File

@ -58,6 +58,7 @@ import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils; import io.grpc.internal.testing.TestUtils;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
@ -170,7 +171,7 @@ public class NettyClientTransportTest {
address, NioSocketChannel.class, channelOptions, group, newNegotiator(), address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
tooManyPingsRunnable); tooManyPingsRunnable, new TransportTracer());
transports.add(transport); transports.add(transport);
callMeMaybe(transport.start(clientTransportListener)); callMeMaybe(transport.start(clientTransportListener));
@ -374,7 +375,7 @@ public class NettyClientTransportTest {
address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group, address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable); null, tooManyPingsRunnable, new TransportTracer());
transports.add(transport); transports.add(transport);
// Should not throw // Should not throw
@ -542,7 +543,8 @@ public class NettyClientTransportTest {
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator, address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano, keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable); false, authority, userAgent, tooManyPingsRunnable,
new TransportTracer());
transports.add(transport); transports.add(transport);
return transport; return transport;
} }
@ -555,7 +557,9 @@ public class NettyClientTransportTest {
server = new NettyServer( server = new NettyServer(
TestUtils.testServerAddress(0), TestUtils.testServerAddress(0),
NioServerSocketChannel.class, group, group, negotiator, NioServerSocketChannel.class, group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(), maxStreamsPerConnection, Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED, MAX_CONNECTION_IDLE_NANOS_DISABLED,

View File

@ -22,6 +22,7 @@ import io.grpc.ServerStreamTracer;
import io.grpc.internal.ServerListener; import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
@ -42,6 +43,7 @@ public class NettyServerTest {
null, // no event group null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(), new ProtocolNegotiators.PlaintextNegotiator(),
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
@ -77,6 +79,7 @@ public class NettyServerTest {
null, // no event group null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(), new ProtocolNegotiators.PlaintextNegotiator(),
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore

View File

@ -18,11 +18,14 @@ package io.grpc.netty;
import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer;
import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.InternalServer; import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.AbstractTransportTest; import io.grpc.internal.testing.AbstractTransportTest;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After; import org.junit.After;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -32,14 +35,23 @@ import org.junit.runners.JUnit4;
/** Unit tests for Netty transport. */ /** Unit tests for Netty transport. */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class NettyTransportTest extends AbstractTransportTest { public class NettyTransportTest extends AbstractTransportTest {
private final FakeClock fakeClock = new FakeClock();
private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
new TransportTracer.TimeProvider() {
@Override
public long currentTimeMillis() {
return fakeClock.currentTimeMillis();
}
});
// Avoid LocalChannel for testing because LocalChannel can fail with // Avoid LocalChannel for testing because LocalChannel can fail with
// io.netty.channel.ChannelException instead of java.net.ConnectException which breaks // io.netty.channel.ChannelException instead of java.net.ConnectException which breaks
// serverNotListening test. // serverNotListening test.
private ClientTransportFactory clientFactory = NettyChannelBuilder private final ClientTransportFactory clientFactory = NettyChannelBuilder
// Although specified here, address is ignored because we never call build. // Although specified here, address is ignored because we never call build.
.forAddress("localhost", 0) .forAddress("localhost", 0)
.flowControlWindow(65 * 1024) .flowControlWindow(65 * 1024)
.negotiationType(NegotiationType.PLAINTEXT) .negotiationType(NegotiationType.PLAINTEXT)
.setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportFactory(); .buildTransportFactory();
@Override @Override
@ -57,6 +69,7 @@ public class NettyTransportTest extends AbstractTransportTest {
return NettyServerBuilder return NettyServerBuilder
.forPort(0) .forPort(0)
.flowControlWindow(65 * 1024) .flowControlWindow(65 * 1024)
.setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportServer(streamTracerFactories); .buildTransportServer(streamTracerFactories);
} }
@ -67,6 +80,7 @@ public class NettyTransportTest extends AbstractTransportTest {
return NettyServerBuilder return NettyServerBuilder
.forPort(port) .forPort(port)
.flowControlWindow(65 * 1024) .flowControlWindow(65 * 1024)
.setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportServer(streamTracerFactories); .buildTransportServer(streamTracerFactories);
} }
@ -75,6 +89,16 @@ public class NettyTransportTest extends AbstractTransportTest {
return "localhost:" + server.getPort(); return "localhost:" + server.getPort();
} }
@Override
protected void advanceClock(long offset, TimeUnit unit) {
fakeClock.forwardNanos(unit.toNanos(offset));
}
@Override
protected long currentTimeMillis() {
return fakeClock.currentTimeMillis();
}
@Override @Override
protected ManagedClientTransport newClientTransport(InternalServer server) { protected ManagedClientTransport newClientTransport(InternalServer server) {
int port = server.getPort(); int port = server.getPort();

View File

@ -190,6 +190,21 @@ public abstract class AbstractTransportTest {
} }
} }
/**
* Moves the clock forward, for tests that require moving the clock forward. It is the transport
* subclass's responsibility to implement this method.
*/
protected void advanceClock(long offset, TimeUnit unit) {
throw new UnsupportedOperationException();
}
/**
* Returns the current time, for tests that rely on the clock.
*/
protected long currentTimeMillis() {
throw new UnsupportedOperationException();
}
// TODO(ejona): // TODO(ejona):
// multiple streams on same transport // multiple streams on same transport
// multiple client transports to same server // multiple client transports to same server
@ -1409,23 +1424,23 @@ public abstract class AbstractTransportTest {
serverTransportListener.transport.getTransportStats().get(); serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.streamsStarted); assertEquals(1, serverAfter.streamsStarted);
serverFirstTimestampNanos = serverAfter.lastStreamCreatedTimeNanos; serverFirstTimestampNanos = serverAfter.lastStreamCreatedTimeNanos;
assertThat(System.currentTimeMillis() assertEquals(
- TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos)).isAtMost(50L); currentTimeMillis(),
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos));
TransportTracer.Stats clientAfter = client.getTransportStats().get(); TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.streamsStarted); assertEquals(1, clientAfter.streamsStarted);
clientFirstTimestampNanos = clientAfter.lastStreamCreatedTimeNanos; clientFirstTimestampNanos = clientAfter.lastStreamCreatedTimeNanos;
assertThat(System.currentTimeMillis() assertEquals(
- TimeUnit.NANOSECONDS.toMillis(clientFirstTimestampNanos)).isAtMost(50L); currentTimeMillis(),
TimeUnit.NANOSECONDS.toMillis(clientFirstTimestampNanos));
ServerStream serverStream = serverStreamCreation.stream; ServerStream serverStream = serverStreamCreation.stream;
serverStream.close(Status.OK, new Metadata()); serverStream.close(Status.OK, new Metadata());
} }
// lastStreamCreatedTimeNanos is converted from the system milli clock. Sleep a bit to ensure final long elapsedMillis = 100;
// it has moved forward in time. advanceClock(100, TimeUnit.MILLISECONDS);
// TODO(zpencer): plumb in a fake clock instead
Thread.sleep(5);
// start second stream // start second stream
{ {
@ -1444,17 +1459,21 @@ public abstract class AbstractTransportTest {
TransportTracer.Stats serverAfter = TransportTracer.Stats serverAfter =
serverTransportListener.transport.getTransportStats().get(); serverTransportListener.transport.getTransportStats().get();
assertEquals(2, serverAfter.streamsStarted); assertEquals(2, serverAfter.streamsStarted);
assertTrue(serverAfter.lastStreamCreatedTimeNanos > serverFirstTimestampNanos); assertEquals(
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
serverAfter.lastStreamCreatedTimeNanos - serverFirstTimestampNanos);
long serverSecondTimestamp = long serverSecondTimestamp =
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos); TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos);
assertThat(System.currentTimeMillis() - serverSecondTimestamp).isAtMost(50L); assertEquals(currentTimeMillis(), serverSecondTimestamp);
TransportTracer.Stats clientAfter = client.getTransportStats().get(); TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(2, clientAfter.streamsStarted); assertEquals(2, clientAfter.streamsStarted);
assertTrue(clientAfter.lastStreamCreatedTimeNanos > clientFirstTimestampNanos); assertEquals(
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
clientAfter.lastStreamCreatedTimeNanos - clientFirstTimestampNanos);
long clientSecondTimestamp = long clientSecondTimestamp =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastStreamCreatedTimeNanos); TimeUnit.NANOSECONDS.toMillis(clientAfter.lastStreamCreatedTimeNanos);
assertThat(System.currentTimeMillis() - clientSecondTimestamp).isAtMost(50L); assertEquals(currentTimeMillis(), clientSecondTimestamp);
ServerStream serverStream = serverStreamCreation.stream; ServerStream serverStream = serverStreamCreation.stream;
serverStream.close(Status.OK, new Metadata()); serverStream.close(Status.OK, new Metadata());
@ -1618,12 +1637,12 @@ public abstract class AbstractTransportTest {
assertEquals(1, serverAfter.messagesReceived); assertEquals(1, serverAfter.messagesReceived);
long serverTimestamp = long serverTimestamp =
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos); TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos);
assertThat(System.currentTimeMillis() - serverTimestamp).isAtMost(50L); assertEquals(currentTimeMillis(), serverTimestamp);
TransportTracer.Stats clientAfter = client.getTransportStats().get(); TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.messagesSent); assertEquals(1, clientAfter.messagesSent);
long clientTimestamp = long clientTimestamp =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos); TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos);
assertThat(System.currentTimeMillis() - clientTimestamp).isAtMost(50L); assertEquals(currentTimeMillis(), clientTimestamp);
serverStream.close(Status.OK, new Metadata()); serverStream.close(Status.OK, new Metadata());
} }
@ -1663,12 +1682,13 @@ public abstract class AbstractTransportTest {
serverTransportListener.transport.getTransportStats().get(); serverTransportListener.transport.getTransportStats().get();
assertEquals(1, serverAfter.messagesSent); assertEquals(1, serverAfter.messagesSent);
long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos); long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos);
assertThat(System.currentTimeMillis() - serverTimestmap).isAtMost(50L); assertEquals(currentTimeMillis(), serverTimestmap);
TransportTracer.Stats clientAfter = client.getTransportStats().get(); TransportTracer.Stats clientAfter = client.getTransportStats().get();
assertEquals(1, clientAfter.messagesReceived); assertEquals(1, clientAfter.messagesReceived);
long clientTimestmap = long clientTimestmap =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos); TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos);
assertThat(System.currentTimeMillis() - clientTimestmap).isAtMost(50L); assertEquals(currentTimeMillis(), clientTimestmap);
serverStream.close(Status.OK, new Metadata()); serverStream.close(Status.OK, new Metadata());
} }