mirror of https://github.com/grpc/grpc-java.git
all: TimeProvider to use nanos rather than millis
This is the same practice as #2833
This commit is contained in:
parent
30478d88c7
commit
f5f560ad36
|
|
@ -16,7 +16,8 @@
|
||||||
|
|
||||||
package io.grpc.internal;
|
package io.grpc.internal;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import static io.grpc.internal.TimeProvider.SYSTEM_TIME_PROVIDER;
|
||||||
|
|
||||||
import io.grpc.internal.Channelz.ChannelStats;
|
import io.grpc.internal.Channelz.ChannelStats;
|
||||||
import io.grpc.internal.Channelz.ServerStats;
|
import io.grpc.internal.Channelz.ServerStats;
|
||||||
|
|
||||||
|
|
@ -28,7 +29,7 @@ final class CallTracer {
|
||||||
private final LongCounter callsStarted = LongCounterFactory.create();
|
private final LongCounter callsStarted = LongCounterFactory.create();
|
||||||
private final LongCounter callsSucceeded = LongCounterFactory.create();
|
private final LongCounter callsSucceeded = LongCounterFactory.create();
|
||||||
private final LongCounter callsFailed = LongCounterFactory.create();
|
private final LongCounter callsFailed = LongCounterFactory.create();
|
||||||
private volatile long lastCallStartedMillis;
|
private volatile long lastCallStartedNanos;
|
||||||
|
|
||||||
CallTracer(TimeProvider timeProvider) {
|
CallTracer(TimeProvider timeProvider) {
|
||||||
this.timeProvider = timeProvider;
|
this.timeProvider = timeProvider;
|
||||||
|
|
@ -36,7 +37,7 @@ final class CallTracer {
|
||||||
|
|
||||||
public void reportCallStarted() {
|
public void reportCallStarted() {
|
||||||
callsStarted.add(1);
|
callsStarted.add(1);
|
||||||
lastCallStartedMillis = timeProvider.currentTimeMillis();
|
lastCallStartedNanos = timeProvider.currentTimeNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void reportCallEnded(boolean success) {
|
public void reportCallEnded(boolean success) {
|
||||||
|
|
@ -52,7 +53,7 @@ final class CallTracer {
|
||||||
.setCallsStarted(callsStarted.value())
|
.setCallsStarted(callsStarted.value())
|
||||||
.setCallsSucceeded(callsSucceeded.value())
|
.setCallsSucceeded(callsSucceeded.value())
|
||||||
.setCallsFailed(callsFailed.value())
|
.setCallsFailed(callsFailed.value())
|
||||||
.setLastCallStartedMillis(lastCallStartedMillis);
|
.setLastCallStartedNanos(lastCallStartedNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateBuilder(ServerStats.Builder builder) {
|
void updateBuilder(ServerStats.Builder builder) {
|
||||||
|
|
@ -60,26 +61,13 @@ final class CallTracer {
|
||||||
.setCallsStarted(callsStarted.value())
|
.setCallsStarted(callsStarted.value())
|
||||||
.setCallsSucceeded(callsSucceeded.value())
|
.setCallsSucceeded(callsSucceeded.value())
|
||||||
.setCallsFailed(callsFailed.value())
|
.setCallsFailed(callsFailed.value())
|
||||||
.setLastCallStartedMillis(lastCallStartedMillis);
|
.setLastCallStartedNanos(lastCallStartedNanos);
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
interface TimeProvider {
|
|
||||||
/** Returns the current milli time. */
|
|
||||||
long currentTimeMillis();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Factory {
|
public interface Factory {
|
||||||
CallTracer create();
|
CallTracer create();
|
||||||
}
|
}
|
||||||
|
|
||||||
static final TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() {
|
|
||||||
@Override
|
|
||||||
public long currentTimeMillis() {
|
|
||||||
return System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
static final Factory DEFAULT_FACTORY = new Factory() {
|
static final Factory DEFAULT_FACTORY = new Factory() {
|
||||||
@Override
|
@Override
|
||||||
public CallTracer create() {
|
public CallTracer create() {
|
||||||
|
|
|
||||||
|
|
@ -279,7 +279,7 @@ public final class Channelz {
|
||||||
public final long callsStarted;
|
public final long callsStarted;
|
||||||
public final long callsSucceeded;
|
public final long callsSucceeded;
|
||||||
public final long callsFailed;
|
public final long callsFailed;
|
||||||
public final long lastCallStartedMillis;
|
public final long lastCallStartedNanos;
|
||||||
public final List<Instrumented<SocketStats>> listenSockets;
|
public final List<Instrumented<SocketStats>> listenSockets;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -289,12 +289,12 @@ public final class Channelz {
|
||||||
long callsStarted,
|
long callsStarted,
|
||||||
long callsSucceeded,
|
long callsSucceeded,
|
||||||
long callsFailed,
|
long callsFailed,
|
||||||
long lastCallStartedMillis,
|
long lastCallStartedNanos,
|
||||||
List<Instrumented<SocketStats>> listenSockets) {
|
List<Instrumented<SocketStats>> listenSockets) {
|
||||||
this.callsStarted = callsStarted;
|
this.callsStarted = callsStarted;
|
||||||
this.callsSucceeded = callsSucceeded;
|
this.callsSucceeded = callsSucceeded;
|
||||||
this.callsFailed = callsFailed;
|
this.callsFailed = callsFailed;
|
||||||
this.lastCallStartedMillis = lastCallStartedMillis;
|
this.lastCallStartedNanos = lastCallStartedNanos;
|
||||||
this.listenSockets = checkNotNull(listenSockets);
|
this.listenSockets = checkNotNull(listenSockets);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -302,7 +302,7 @@ public final class Channelz {
|
||||||
private long callsStarted;
|
private long callsStarted;
|
||||||
private long callsSucceeded;
|
private long callsSucceeded;
|
||||||
private long callsFailed;
|
private long callsFailed;
|
||||||
private long lastCallStartedMillis;
|
private long lastCallStartedNanos;
|
||||||
public List<Instrumented<SocketStats>> listenSockets = Collections.emptyList();
|
public List<Instrumented<SocketStats>> listenSockets = Collections.emptyList();
|
||||||
|
|
||||||
public Builder setCallsStarted(long callsStarted) {
|
public Builder setCallsStarted(long callsStarted) {
|
||||||
|
|
@ -320,8 +320,8 @@ public final class Channelz {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setLastCallStartedMillis(long lastCallStartedMillis) {
|
public Builder setLastCallStartedNanos(long lastCallStartedNanos) {
|
||||||
this.lastCallStartedMillis = lastCallStartedMillis;
|
this.lastCallStartedNanos = lastCallStartedNanos;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -341,7 +341,7 @@ public final class Channelz {
|
||||||
callsStarted,
|
callsStarted,
|
||||||
callsSucceeded,
|
callsSucceeded,
|
||||||
callsFailed,
|
callsFailed,
|
||||||
lastCallStartedMillis,
|
lastCallStartedNanos,
|
||||||
listenSockets);
|
listenSockets);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -358,7 +358,7 @@ public final class Channelz {
|
||||||
public final long callsStarted;
|
public final long callsStarted;
|
||||||
public final long callsSucceeded;
|
public final long callsSucceeded;
|
||||||
public final long callsFailed;
|
public final long callsFailed;
|
||||||
public final long lastCallStartedMillis;
|
public final long lastCallStartedNanos;
|
||||||
public final List<WithLogId> subchannels;
|
public final List<WithLogId> subchannels;
|
||||||
public final List<WithLogId> sockets;
|
public final List<WithLogId> sockets;
|
||||||
|
|
||||||
|
|
@ -372,7 +372,7 @@ public final class Channelz {
|
||||||
long callsStarted,
|
long callsStarted,
|
||||||
long callsSucceeded,
|
long callsSucceeded,
|
||||||
long callsFailed,
|
long callsFailed,
|
||||||
long lastCallStartedMillis,
|
long lastCallStartedNanos,
|
||||||
List<WithLogId> subchannels,
|
List<WithLogId> subchannels,
|
||||||
List<WithLogId> sockets) {
|
List<WithLogId> sockets) {
|
||||||
Preconditions.checkState(
|
Preconditions.checkState(
|
||||||
|
|
@ -385,7 +385,7 @@ public final class Channelz {
|
||||||
this.callsStarted = callsStarted;
|
this.callsStarted = callsStarted;
|
||||||
this.callsSucceeded = callsSucceeded;
|
this.callsSucceeded = callsSucceeded;
|
||||||
this.callsFailed = callsFailed;
|
this.callsFailed = callsFailed;
|
||||||
this.lastCallStartedMillis = lastCallStartedMillis;
|
this.lastCallStartedNanos = lastCallStartedNanos;
|
||||||
this.subchannels = checkNotNull(subchannels);
|
this.subchannels = checkNotNull(subchannels);
|
||||||
this.sockets = checkNotNull(sockets);
|
this.sockets = checkNotNull(sockets);
|
||||||
}
|
}
|
||||||
|
|
@ -397,7 +397,7 @@ public final class Channelz {
|
||||||
private long callsStarted;
|
private long callsStarted;
|
||||||
private long callsSucceeded;
|
private long callsSucceeded;
|
||||||
private long callsFailed;
|
private long callsFailed;
|
||||||
private long lastCallStartedMillis;
|
private long lastCallStartedNanos;
|
||||||
private List<WithLogId> subchannels = Collections.emptyList();
|
private List<WithLogId> subchannels = Collections.emptyList();
|
||||||
private List<WithLogId> sockets = Collections.emptyList();
|
private List<WithLogId> sockets = Collections.emptyList();
|
||||||
|
|
||||||
|
|
@ -431,8 +431,8 @@ public final class Channelz {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setLastCallStartedMillis(long lastCallStartedMillis) {
|
public Builder setLastCallStartedNanos(long lastCallStartedNanos) {
|
||||||
this.lastCallStartedMillis = lastCallStartedMillis;
|
this.lastCallStartedNanos = lastCallStartedNanos;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -461,7 +461,7 @@ public final class Channelz {
|
||||||
callsStarted,
|
callsStarted,
|
||||||
callsSucceeded,
|
callsSucceeded,
|
||||||
callsFailed,
|
callsFailed,
|
||||||
lastCallStartedMillis,
|
lastCallStartedNanos,
|
||||||
subchannels,
|
subchannels,
|
||||||
sockets);
|
sockets);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,11 +14,25 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io.grpc.grpclb;
|
package io.grpc.internal;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allow time manipulation in tests.
|
* Time source representing the current system time in nanos. Used to inject a fake clock
|
||||||
|
* into unit tests.
|
||||||
*/
|
*/
|
||||||
interface TimeProvider {
|
public interface TimeProvider {
|
||||||
long currentTimeMillis();
|
/** Returns the current nano time. */
|
||||||
|
long currentTimeNanos();
|
||||||
|
|
||||||
|
TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() {
|
||||||
|
final long offsetNanos =
|
||||||
|
TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()) - System.nanoTime();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long currentTimeNanos() {
|
||||||
|
return System.nanoTime() + offsetNanos;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
@ -16,22 +16,17 @@
|
||||||
|
|
||||||
package io.grpc.internal;
|
package io.grpc.internal;
|
||||||
|
|
||||||
|
import static io.grpc.internal.TimeProvider.SYSTEM_TIME_PROVIDER;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import io.grpc.internal.Channelz.TransportStats;
|
import io.grpc.internal.Channelz.TransportStats;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class for gathering statistics about a transport. This is an experimental feature.
|
* A class for gathering statistics about a transport. This is an experimental feature.
|
||||||
* Can only be called from the transport thread unless otherwise noted.
|
* Can only be called from the transport thread unless otherwise noted.
|
||||||
*/
|
*/
|
||||||
public final class TransportTracer {
|
public final class TransportTracer {
|
||||||
private static final TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() {
|
|
||||||
@Override
|
|
||||||
public long currentTimeMillis() {
|
|
||||||
return System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
private static final Factory DEFAULT_FACTORY = new Factory(SYSTEM_TIME_PROVIDER);
|
private static final Factory DEFAULT_FACTORY = new Factory(SYSTEM_TIME_PROVIDER);
|
||||||
|
|
||||||
private final TimeProvider timeProvider;
|
private final TimeProvider timeProvider;
|
||||||
|
|
@ -85,7 +80,7 @@ public final class TransportTracer {
|
||||||
*/
|
*/
|
||||||
public void reportLocalStreamStarted() {
|
public void reportLocalStreamStarted() {
|
||||||
streamsStarted++;
|
streamsStarted++;
|
||||||
lastLocalStreamCreatedTimeNanos = currentTimeNanos();
|
lastLocalStreamCreatedTimeNanos = timeProvider.currentTimeNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -93,7 +88,7 @@ public final class TransportTracer {
|
||||||
*/
|
*/
|
||||||
public void reportRemoteStreamStarted() {
|
public void reportRemoteStreamStarted() {
|
||||||
streamsStarted++;
|
streamsStarted++;
|
||||||
lastRemoteStreamCreatedTimeNanos = currentTimeNanos();
|
lastRemoteStreamCreatedTimeNanos = timeProvider.currentTimeNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -115,7 +110,7 @@ public final class TransportTracer {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
messagesSent += numMessages;
|
messagesSent += numMessages;
|
||||||
lastMessageSentTimeNanos = currentTimeNanos();
|
lastMessageSentTimeNanos = timeProvider.currentTimeNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -123,7 +118,7 @@ public final class TransportTracer {
|
||||||
*/
|
*/
|
||||||
public void reportMessageReceived() {
|
public void reportMessageReceived() {
|
||||||
messagesReceived.add(1);
|
messagesReceived.add(1);
|
||||||
lastMessageReceivedTimeNanos = currentTimeNanos();
|
lastMessageReceivedTimeNanos = timeProvider.currentTimeNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -161,20 +156,6 @@ public final class TransportTracer {
|
||||||
FlowControlWindows read();
|
FlowControlWindows read();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long currentTimeNanos() {
|
|
||||||
return TimeUnit.MILLISECONDS.toNanos(timeProvider.currentTimeMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Time source representing the current system time in millis. Used to inject a fake clock
|
|
||||||
* into unit tests.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
public interface TimeProvider {
|
|
||||||
/** Returns the current milli time. */
|
|
||||||
long currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class Factory {
|
public static final class Factory {
|
||||||
private TimeProvider timeProvider;
|
private TimeProvider timeProvider;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -149,10 +149,10 @@ public class ManagedChannelImplTest {
|
||||||
private final CallTracer.Factory channelStatsFactory = new CallTracer.Factory() {
|
private final CallTracer.Factory channelStatsFactory = new CallTracer.Factory() {
|
||||||
@Override
|
@Override
|
||||||
public CallTracer create() {
|
public CallTracer create() {
|
||||||
return new CallTracer(new CallTracer.TimeProvider() {
|
return new CallTracer(new TimeProvider() {
|
||||||
@Override
|
@Override
|
||||||
public long currentTimeMillis() {
|
public long currentTimeNanos() {
|
||||||
return executor.currentTimeMillis();
|
return executor.getTicker().read();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -2104,7 +2104,7 @@ public class ManagedChannelImplTest {
|
||||||
assertEquals(0, getStats(channel).callsStarted);
|
assertEquals(0, getStats(channel).callsStarted);
|
||||||
call.start(mockCallListener, new Metadata());
|
call.start(mockCallListener, new Metadata());
|
||||||
assertEquals(1, getStats(channel).callsStarted);
|
assertEquals(1, getStats(channel).callsStarted);
|
||||||
assertEquals(executor.currentTimeMillis(), getStats(channel).lastCallStartedMillis);
|
assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,7 @@ public class ServerCallImplTest {
|
||||||
tracer.updateBuilder(beforeBuilder);
|
tracer.updateBuilder(beforeBuilder);
|
||||||
ServerStats before = beforeBuilder.build();
|
ServerStats before = beforeBuilder.build();
|
||||||
assertEquals(0, before.callsStarted);
|
assertEquals(0, before.callsStarted);
|
||||||
assertEquals(0, before.lastCallStartedMillis);
|
assertEquals(0, before.lastCallStartedNanos);
|
||||||
|
|
||||||
call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
|
call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
|
||||||
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
|
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import io.grpc.CallOptions;
|
||||||
import io.grpc.ClientStreamTracer;
|
import io.grpc.ClientStreamTracer;
|
||||||
import io.grpc.Metadata;
|
import io.grpc.Metadata;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.internal.TimeProvider;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
@ -99,7 +100,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
|
||||||
ClientStats generateLoadReport() {
|
ClientStats generateLoadReport() {
|
||||||
ClientStats.Builder statsBuilder =
|
ClientStats.Builder statsBuilder =
|
||||||
ClientStats.newBuilder()
|
ClientStats.newBuilder()
|
||||||
.setTimestamp(Timestamps.fromMillis(time.currentTimeMillis()))
|
.setTimestamp(Timestamps.fromNanos(time.currentTimeNanos()))
|
||||||
.setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0))
|
.setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0))
|
||||||
.setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0))
|
.setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0))
|
||||||
.setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0))
|
.setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0))
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import io.grpc.grpclb.GrpclbConstants.LbPolicy;
|
||||||
import io.grpc.internal.GrpcAttributes;
|
import io.grpc.internal.GrpcAttributes;
|
||||||
import io.grpc.internal.LogId;
|
import io.grpc.internal.LogId;
|
||||||
import io.grpc.internal.ObjectPool;
|
import io.grpc.internal.ObjectPool;
|
||||||
|
import io.grpc.internal.TimeProvider;
|
||||||
import io.grpc.internal.WithLogId;
|
import io.grpc.internal.WithLogId;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import io.grpc.LoadBalancer;
|
||||||
import io.grpc.PickFirstBalancerFactory;
|
import io.grpc.PickFirstBalancerFactory;
|
||||||
import io.grpc.internal.GrpcUtil;
|
import io.grpc.internal.GrpcUtil;
|
||||||
import io.grpc.internal.SharedResourcePool;
|
import io.grpc.internal.SharedResourcePool;
|
||||||
|
import io.grpc.internal.TimeProvider;
|
||||||
import io.grpc.util.RoundRobinLoadBalancerFactory;
|
import io.grpc.util.RoundRobinLoadBalancerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -33,12 +34,6 @@ import io.grpc.util.RoundRobinLoadBalancerFactory;
|
||||||
public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory {
|
public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory {
|
||||||
|
|
||||||
private static final GrpclbLoadBalancerFactory INSTANCE = new GrpclbLoadBalancerFactory();
|
private static final GrpclbLoadBalancerFactory INSTANCE = new GrpclbLoadBalancerFactory();
|
||||||
private static final TimeProvider TIME_PROVIDER = new TimeProvider() {
|
|
||||||
@Override
|
|
||||||
public long currentTimeMillis() {
|
|
||||||
return System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private GrpclbLoadBalancerFactory() {
|
private GrpclbLoadBalancerFactory() {
|
||||||
}
|
}
|
||||||
|
|
@ -57,6 +52,6 @@ public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory {
|
||||||
// load should not be on the shared scheduled executor, we should use a combination of the
|
// load should not be on the shared scheduled executor, we should use a combination of the
|
||||||
// scheduled executor and the default app executor.
|
// scheduled executor and the default app executor.
|
||||||
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
|
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
|
||||||
TIME_PROVIDER);
|
TimeProvider.SYSTEM_TIME_PROVIDER);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ import io.grpc.Metadata;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.grpclb.LoadBalanceResponse.LoadBalanceResponseTypeCase;
|
import io.grpc.grpclb.LoadBalanceResponse.LoadBalanceResponseTypeCase;
|
||||||
import io.grpc.internal.LogId;
|
import io.grpc.internal.LogId;
|
||||||
|
import io.grpc.internal.TimeProvider;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ import io.grpc.internal.FakeClock;
|
||||||
import io.grpc.internal.GrpcAttributes;
|
import io.grpc.internal.GrpcAttributes;
|
||||||
import io.grpc.internal.ObjectPool;
|
import io.grpc.internal.ObjectPool;
|
||||||
import io.grpc.internal.SerializingExecutor;
|
import io.grpc.internal.SerializingExecutor;
|
||||||
|
import io.grpc.internal.TimeProvider;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
@ -137,8 +138,8 @@ public class GrpclbLoadBalancerTest {
|
||||||
private final ArrayList<String> failingLbAuthorities = new ArrayList<String>();
|
private final ArrayList<String> failingLbAuthorities = new ArrayList<String>();
|
||||||
private final TimeProvider timeProvider = new TimeProvider() {
|
private final TimeProvider timeProvider = new TimeProvider() {
|
||||||
@Override
|
@Override
|
||||||
public long currentTimeMillis() {
|
public long currentTimeNanos() {
|
||||||
return fakeClock.currentTimeMillis();
|
return fakeClock.getTicker().read();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
private io.grpc.Server fakeLbServer;
|
private io.grpc.Server fakeLbServer;
|
||||||
|
|
@ -685,7 +686,7 @@ public class GrpclbLoadBalancerTest {
|
||||||
eq(LoadBalanceRequest.newBuilder()
|
eq(LoadBalanceRequest.newBuilder()
|
||||||
.setClientStats(
|
.setClientStats(
|
||||||
ClientStats.newBuilder(expectedReport)
|
ClientStats.newBuilder(expectedReport)
|
||||||
.setTimestamp(Timestamps.fromMillis(fakeClock.currentTimeMillis()))
|
.setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read()))
|
||||||
.build())
|
.build())
|
||||||
.build()));
|
.build()));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import io.grpc.internal.ClientTransportFactory;
|
||||||
import io.grpc.internal.FakeClock;
|
import io.grpc.internal.FakeClock;
|
||||||
import io.grpc.internal.InternalServer;
|
import io.grpc.internal.InternalServer;
|
||||||
import io.grpc.internal.ManagedClientTransport;
|
import io.grpc.internal.ManagedClientTransport;
|
||||||
import io.grpc.internal.TransportTracer;
|
|
||||||
import io.grpc.internal.testing.AbstractTransportTest;
|
import io.grpc.internal.testing.AbstractTransportTest;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -36,13 +35,6 @@ import org.junit.runners.JUnit4;
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
public class NettyTransportTest extends AbstractTransportTest {
|
public class NettyTransportTest extends AbstractTransportTest {
|
||||||
private final FakeClock fakeClock = new FakeClock();
|
private final FakeClock fakeClock = new FakeClock();
|
||||||
private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
|
|
||||||
new TransportTracer.TimeProvider() {
|
|
||||||
@Override
|
|
||||||
public long currentTimeMillis() {
|
|
||||||
return fakeClock.currentTimeMillis();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// Avoid LocalChannel for testing because LocalChannel can fail with
|
// Avoid LocalChannel for testing because LocalChannel can fail with
|
||||||
// io.netty.channel.ChannelException instead of java.net.ConnectException which breaks
|
// io.netty.channel.ChannelException instead of java.net.ConnectException which breaks
|
||||||
// serverNotListening test.
|
// serverNotListening test.
|
||||||
|
|
@ -95,8 +87,8 @@ public class NettyTransportTest extends AbstractTransportTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long currentTimeMillis() {
|
protected long fakeCurrentTimeNanos() {
|
||||||
return fakeClock.currentTimeMillis();
|
return fakeClock.getTicker().read();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import io.grpc.internal.ClientTransportFactory;
|
||||||
import io.grpc.internal.FakeClock;
|
import io.grpc.internal.FakeClock;
|
||||||
import io.grpc.internal.InternalServer;
|
import io.grpc.internal.InternalServer;
|
||||||
import io.grpc.internal.ManagedClientTransport;
|
import io.grpc.internal.ManagedClientTransport;
|
||||||
import io.grpc.internal.TransportTracer;
|
|
||||||
import io.grpc.internal.testing.AbstractTransportTest;
|
import io.grpc.internal.testing.AbstractTransportTest;
|
||||||
import io.grpc.netty.NettyServerBuilder;
|
import io.grpc.netty.NettyServerBuilder;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
@ -38,13 +37,6 @@ import org.junit.runners.JUnit4;
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
public class OkHttpTransportTest extends AbstractTransportTest {
|
public class OkHttpTransportTest extends AbstractTransportTest {
|
||||||
private final FakeClock fakeClock = new FakeClock();
|
private final FakeClock fakeClock = new FakeClock();
|
||||||
private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
|
|
||||||
new TransportTracer.TimeProvider() {
|
|
||||||
@Override
|
|
||||||
public long currentTimeMillis() {
|
|
||||||
return fakeClock.currentTimeMillis();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
private ClientTransportFactory clientFactory = OkHttpChannelBuilder
|
private ClientTransportFactory clientFactory = OkHttpChannelBuilder
|
||||||
// Although specified here, address is ignored because we never call build.
|
// Although specified here, address is ignored because we never call build.
|
||||||
.forAddress("localhost", 0)
|
.forAddress("localhost", 0)
|
||||||
|
|
@ -100,8 +92,8 @@ public class OkHttpTransportTest extends AbstractTransportTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long currentTimeMillis() {
|
protected long fakeCurrentTimeNanos() {
|
||||||
return fakeClock.currentTimeMillis();
|
return fakeClock.getTicker().read();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ejona): Flaky/Broken
|
// TODO(ejona): Flaky/Broken
|
||||||
|
|
|
||||||
|
|
@ -138,7 +138,7 @@ final class ChannelzProtoUtil {
|
||||||
.setCallsStarted(stats.callsStarted)
|
.setCallsStarted(stats.callsStarted)
|
||||||
.setCallsSucceeded(stats.callsSucceeded)
|
.setCallsSucceeded(stats.callsSucceeded)
|
||||||
.setCallsFailed(stats.callsFailed)
|
.setCallsFailed(stats.callsFailed)
|
||||||
.setLastCallStartedTimestamp(Timestamps.fromMillis(stats.lastCallStartedMillis))
|
.setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -359,7 +359,7 @@ final class ChannelzProtoUtil {
|
||||||
.setCallsStarted(stats.callsStarted)
|
.setCallsStarted(stats.callsStarted)
|
||||||
.setCallsSucceeded(stats.callsSucceeded)
|
.setCallsSucceeded(stats.callsSucceeded)
|
||||||
.setCallsFailed(stats.callsFailed)
|
.setCallsFailed(stats.callsFailed)
|
||||||
.setLastCallStartedTimestamp(Timestamps.fromMillis(stats.lastCallStartedMillis));
|
.setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos));
|
||||||
if (stats.channelTrace != null) {
|
if (stats.channelTrace != null) {
|
||||||
builder.setTrace(toChannelTrace(stats.channelTrace));
|
builder.setTrace(toChannelTrace(stats.channelTrace));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ public final class ChannelzProtoUtilTest {
|
||||||
.setCallsStarted(1)
|
.setCallsStarted(1)
|
||||||
.setCallsSucceeded(2)
|
.setCallsSucceeded(2)
|
||||||
.setCallsFailed(3)
|
.setCallsFailed(3)
|
||||||
.setLastCallStartedTimestamp(Timestamps.fromMillis(4))
|
.setLastCallStartedTimestamp(Timestamps.fromNanos(4))
|
||||||
.build();
|
.build();
|
||||||
private final Channel channelProto = Channel
|
private final Channel channelProto = Channel
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
|
|
@ -127,7 +127,7 @@ public final class ChannelzProtoUtilTest {
|
||||||
.setCallsStarted(1)
|
.setCallsStarted(1)
|
||||||
.setCallsSucceeded(2)
|
.setCallsSucceeded(2)
|
||||||
.setCallsFailed(3)
|
.setCallsFailed(3)
|
||||||
.setLastCallStartedTimestamp(Timestamps.fromMillis(4))
|
.setLastCallStartedTimestamp(Timestamps.fromNanos(4))
|
||||||
.build();
|
.build();
|
||||||
private final Subchannel subchannelProto = Subchannel
|
private final Subchannel subchannelProto = Subchannel
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
|
|
@ -146,7 +146,7 @@ public final class ChannelzProtoUtilTest {
|
||||||
.setCallsStarted(1)
|
.setCallsStarted(1)
|
||||||
.setCallsSucceeded(2)
|
.setCallsSucceeded(2)
|
||||||
.setCallsFailed(3)
|
.setCallsFailed(3)
|
||||||
.setLastCallStartedTimestamp(Timestamps.fromMillis(4))
|
.setLastCallStartedTimestamp(Timestamps.fromNanos(4))
|
||||||
.build();
|
.build();
|
||||||
private final Server serverProto = Server
|
private final Server serverProto = Server
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
|
|
@ -912,7 +912,7 @@ public final class ChannelzProtoUtilTest {
|
||||||
.setCallsStarted(stats.callsStarted)
|
.setCallsStarted(stats.callsStarted)
|
||||||
.setCallsSucceeded(stats.callsSucceeded)
|
.setCallsSucceeded(stats.callsSucceeded)
|
||||||
.setCallsFailed(stats.callsFailed)
|
.setCallsFailed(stats.callsFailed)
|
||||||
.setLastCallStartedMillis(stats.lastCallStartedMillis);
|
.setLastCallStartedNanos(stats.lastCallStartedNanos);
|
||||||
if (!stats.subchannels.isEmpty()) {
|
if (!stats.subchannels.isEmpty()) {
|
||||||
builder.setSubchannels(stats.subchannels);
|
builder.setSubchannels(stats.subchannels);
|
||||||
}
|
}
|
||||||
|
|
@ -938,7 +938,7 @@ public final class ChannelzProtoUtilTest {
|
||||||
.setCallsStarted(stats.callsStarted)
|
.setCallsStarted(stats.callsStarted)
|
||||||
.setCallsSucceeded(stats.callsSucceeded)
|
.setCallsSucceeded(stats.callsSucceeded)
|
||||||
.setCallsFailed(stats.callsFailed)
|
.setCallsFailed(stats.callsFailed)
|
||||||
.setLastCallStartedMillis(stats.lastCallStartedMillis)
|
.setLastCallStartedNanos(stats.lastCallStartedNanos)
|
||||||
.setListenSockets(stats.listenSockets);
|
.setListenSockets(stats.listenSockets);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ final class ChannelzTestHelper {
|
||||||
/*callsStarted=*/ 1,
|
/*callsStarted=*/ 1,
|
||||||
/*callsSucceeded=*/ 2,
|
/*callsSucceeded=*/ 2,
|
||||||
/*callsFailed=*/ 3,
|
/*callsFailed=*/ 3,
|
||||||
/*lastCallStartedMillis=*/ 4,
|
/*lastCallStartedNanos=*/ 4,
|
||||||
Collections.<Instrumented<SocketStats>>emptyList());
|
Collections.<Instrumented<SocketStats>>emptyList());
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -154,7 +154,7 @@ final class ChannelzTestHelper {
|
||||||
.setCallsStarted(1)
|
.setCallsStarted(1)
|
||||||
.setCallsSucceeded(2)
|
.setCallsSucceeded(2)
|
||||||
.setCallsFailed(3)
|
.setCallsFailed(3)
|
||||||
.setLastCallStartedMillis(4)
|
.setLastCallStartedNanos(4)
|
||||||
.setSubchannels(Collections.<WithLogId>emptyList())
|
.setSubchannels(Collections.<WithLogId>emptyList())
|
||||||
.setSockets(Collections.<WithLogId>emptyList())
|
.setSockets(Collections.<WithLogId>emptyList())
|
||||||
.build();
|
.build();
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,8 @@ import io.grpc.internal.ServerStream;
|
||||||
import io.grpc.internal.ServerStreamListener;
|
import io.grpc.internal.ServerStreamListener;
|
||||||
import io.grpc.internal.ServerTransport;
|
import io.grpc.internal.ServerTransport;
|
||||||
import io.grpc.internal.ServerTransportListener;
|
import io.grpc.internal.ServerTransportListener;
|
||||||
|
import io.grpc.internal.TimeProvider;
|
||||||
|
import io.grpc.internal.TransportTracer;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
|
@ -97,6 +99,14 @@ public abstract class AbstractTransportTest {
|
||||||
private static final Attributes.Key<String> ADDITIONAL_TRANSPORT_ATTR_KEY =
|
private static final Attributes.Key<String> ADDITIONAL_TRANSPORT_ATTR_KEY =
|
||||||
Attributes.Key.create("additional-attr");
|
Attributes.Key.create("additional-attr");
|
||||||
|
|
||||||
|
protected final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
|
||||||
|
new TimeProvider() {
|
||||||
|
@Override
|
||||||
|
public long currentTimeNanos() {
|
||||||
|
return fakeCurrentTimeNanos();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a new server that when started will be able to be connected to from the client. Each
|
* Returns a new server that when started will be able to be connected to from the client. Each
|
||||||
* returned instance should be new and yet be accessible by new client transports.
|
* returned instance should be new and yet be accessible by new client transports.
|
||||||
|
|
@ -204,7 +214,7 @@ public abstract class AbstractTransportTest {
|
||||||
/**
|
/**
|
||||||
* Returns the current time, for tests that rely on the clock.
|
* Returns the current time, for tests that rely on the clock.
|
||||||
*/
|
*/
|
||||||
protected long currentTimeMillis() {
|
protected long fakeCurrentTimeNanos() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1435,16 +1445,12 @@ public abstract class AbstractTransportTest {
|
||||||
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
|
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
|
||||||
assertEquals(1, serverAfter.streamsStarted);
|
assertEquals(1, serverAfter.streamsStarted);
|
||||||
serverFirstTimestampNanos = serverAfter.lastRemoteStreamCreatedTimeNanos;
|
serverFirstTimestampNanos = serverAfter.lastRemoteStreamCreatedTimeNanos;
|
||||||
assertEquals(
|
assertEquals(fakeCurrentTimeNanos(), serverAfter.lastRemoteStreamCreatedTimeNanos);
|
||||||
currentTimeMillis(),
|
|
||||||
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastRemoteStreamCreatedTimeNanos));
|
|
||||||
|
|
||||||
TransportStats clientAfter = getTransportStats(client);
|
TransportStats clientAfter = getTransportStats(client);
|
||||||
assertEquals(1, clientAfter.streamsStarted);
|
assertEquals(1, clientAfter.streamsStarted);
|
||||||
clientFirstTimestampNanos = clientAfter.lastLocalStreamCreatedTimeNanos;
|
clientFirstTimestampNanos = clientAfter.lastLocalStreamCreatedTimeNanos;
|
||||||
assertEquals(
|
assertEquals(fakeCurrentTimeNanos(), clientFirstTimestampNanos);
|
||||||
currentTimeMillis(),
|
|
||||||
TimeUnit.NANOSECONDS.toMillis(clientFirstTimestampNanos));
|
|
||||||
|
|
||||||
ServerStream serverStream = serverStreamCreation.stream;
|
ServerStream serverStream = serverStreamCreation.stream;
|
||||||
serverStream.close(Status.OK, new Metadata());
|
serverStream.close(Status.OK, new Metadata());
|
||||||
|
|
@ -1471,18 +1477,14 @@ public abstract class AbstractTransportTest {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
|
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
|
||||||
serverAfter.lastRemoteStreamCreatedTimeNanos - serverFirstTimestampNanos);
|
serverAfter.lastRemoteStreamCreatedTimeNanos - serverFirstTimestampNanos);
|
||||||
long serverSecondTimestamp =
|
assertEquals(fakeCurrentTimeNanos(), serverAfter.lastRemoteStreamCreatedTimeNanos);
|
||||||
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastRemoteStreamCreatedTimeNanos);
|
|
||||||
assertEquals(currentTimeMillis(), serverSecondTimestamp);
|
|
||||||
|
|
||||||
TransportStats clientAfter = getTransportStats(client);
|
TransportStats clientAfter = getTransportStats(client);
|
||||||
assertEquals(2, clientAfter.streamsStarted);
|
assertEquals(2, clientAfter.streamsStarted);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
|
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
|
||||||
clientAfter.lastLocalStreamCreatedTimeNanos - clientFirstTimestampNanos);
|
clientAfter.lastLocalStreamCreatedTimeNanos - clientFirstTimestampNanos);
|
||||||
long clientSecondTimestamp =
|
assertEquals(fakeCurrentTimeNanos(), clientAfter.lastLocalStreamCreatedTimeNanos);
|
||||||
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastLocalStreamCreatedTimeNanos);
|
|
||||||
assertEquals(currentTimeMillis(), clientSecondTimestamp);
|
|
||||||
|
|
||||||
ServerStream serverStream = serverStreamCreation.stream;
|
ServerStream serverStream = serverStreamCreation.stream;
|
||||||
serverStream.close(Status.OK, new Metadata());
|
serverStream.close(Status.OK, new Metadata());
|
||||||
|
|
@ -1636,14 +1638,10 @@ public abstract class AbstractTransportTest {
|
||||||
|
|
||||||
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
|
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
|
||||||
assertEquals(1, serverAfter.messagesReceived);
|
assertEquals(1, serverAfter.messagesReceived);
|
||||||
long serverTimestamp =
|
assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageReceivedTimeNanos);
|
||||||
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos);
|
|
||||||
assertEquals(currentTimeMillis(), serverTimestamp);
|
|
||||||
TransportStats clientAfter = getTransportStats(client);
|
TransportStats clientAfter = getTransportStats(client);
|
||||||
assertEquals(1, clientAfter.messagesSent);
|
assertEquals(1, clientAfter.messagesSent);
|
||||||
long clientTimestamp =
|
assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageSentTimeNanos);
|
||||||
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos);
|
|
||||||
assertEquals(currentTimeMillis(), clientTimestamp);
|
|
||||||
|
|
||||||
serverStream.close(Status.OK, new Metadata());
|
serverStream.close(Status.OK, new Metadata());
|
||||||
}
|
}
|
||||||
|
|
@ -1680,14 +1678,10 @@ public abstract class AbstractTransportTest {
|
||||||
|
|
||||||
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
|
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
|
||||||
assertEquals(1, serverAfter.messagesSent);
|
assertEquals(1, serverAfter.messagesSent);
|
||||||
long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos);
|
assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageSentTimeNanos);
|
||||||
assertEquals(currentTimeMillis(), serverTimestmap);
|
|
||||||
TransportStats clientAfter = getTransportStats(client);
|
TransportStats clientAfter = getTransportStats(client);
|
||||||
assertEquals(1, clientAfter.messagesReceived);
|
assertEquals(1, clientAfter.messagesReceived);
|
||||||
long clientTimestmap =
|
assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageReceivedTimeNanos);
|
||||||
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos);
|
|
||||||
assertEquals(currentTimeMillis(), clientTimestmap);
|
|
||||||
|
|
||||||
|
|
||||||
serverStream.close(Status.OK, new Metadata());
|
serverStream.close(Status.OK, new Metadata());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue