core: InternalInstrumented<T> for instrumented classes (#3817)

Entities that report stats to channelz will implement this interface.
This commit is contained in:
zpencer 2017-12-13 12:56:25 -08:00 committed by GitHub
parent 5cb609f038
commit f1151f91b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 240 additions and 144 deletions

View File

@ -0,0 +1,27 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import com.google.common.util.concurrent.ListenableFuture;
/**
* An interface for types that <b>may</b> support instrumentation. If the actual type does not
* support instrumentation, then the future will return a {@code null}.
*/
interface Instrumented<T> extends WithLogId {
ListenableFuture<T> getStats();
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
/**
* This is an gRPC internal interface. Do not use this.
*/
@Internal
public interface InternalInstrumented<T> extends Instrumented<T>, InternalWithLogId {
}

View File

@ -16,43 +16,22 @@
package io.grpc;
import java.util.concurrent.atomic.AtomicLong;
/**
* Do not use this.
*
* <p>A loggable ID, unique for the duration of the program.
* Do not use this. This is an internal accessor class.
*/
@Internal
public final class InternalLogId {
private static final AtomicLong idAlloc = new AtomicLong();
public final class InternalLogId extends LogId {
private InternalLogId(String tag, long id) {
super(tag, id);
}
/**
* An accessor method for {@link LogId#allocate(String)}.
*
* @param tag a loggable tag associated with this tag. The ID that is allocated is guaranteed
* to be unique and increasing, irrespective of the tag.
*/
public static InternalLogId allocate(String tag) {
return new InternalLogId(tag, idAlloc.incrementAndGet());
}
private final String tag;
private final long id;
private InternalLogId(String tag, long id) {
this.tag = tag;
this.id = id;
}
public long getId() {
return id;
}
public String getTag() {
return tag;
}
@Override
public String toString() {
return tag + "-" + id;
return new InternalLogId(tag, LogId.getNextId());
}
}

View File

@ -22,13 +22,7 @@ package io.grpc;
* <p>An object that has an ID that is unique within the JVM, primarily for debug logging.
*/
@Internal
public interface InternalWithLogId {
/**
* Returns an ID that is primarily used in debug logs. It usually contains the class name and a
* numeric ID that is unique among the instances.
*
* <p>The subclasses of this interface usually want to include the log ID in their {@link
* #toString} results.
*/
public interface InternalWithLogId extends WithLogId {
@Override
InternalLogId getLogId();
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import java.util.concurrent.atomic.AtomicLong;
/**
* A loggable ID, unique for the duration of the program.
*/
// not final so that InternalLogId can make this class visible outside of io.grpc
class LogId {
private static final AtomicLong idAlloc = new AtomicLong();
/**
* @param tag a loggable tag associated with this tag. The ID that is allocated is guaranteed
* to be unique and increasing, irrespective of the tag.
*/
public static LogId allocate(String tag) {
return new LogId(tag, getNextId());
}
static long getNextId() {
return idAlloc.incrementAndGet();
}
private final String tag;
private final long id;
protected LogId(String tag, long id) {
this.tag = tag;
this.id = id;
}
public long getId() {
return id;
}
public String getTag() {
return tag;
}
@Override
public String toString() {
return tag + "-" + id;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
/**
* An object that has an ID that is unique within the JVM, primarily for debug logging.
*/
interface WithLogId {
/**
* Returns an ID that is primarily used in debug logs. It usually contains the class name and a
* numeric ID that is unique among the instances.
*
* <p>The subclasses of this interface usually want to include the log ID in their {@link
* #toString} results.
*/
LogId getLogId();
}

View File

@ -18,6 +18,7 @@ package io.grpc.inprocess;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
@ -51,7 +52,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -225,7 +225,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
// TODO(zpencer): add transport tracing to in-process server
SettableFuture<InternalTransportStats> ret = SettableFuture.create();
ret.set(null);

View File

@ -17,11 +17,11 @@
package io.grpc.internal;
import io.grpc.CallOptions;
import io.grpc.InternalInstrumented;
import io.grpc.InternalTransportStats;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import javax.annotation.concurrent.ThreadSafe;
/**
@ -31,7 +31,7 @@ import javax.annotation.concurrent.ThreadSafe;
* are expected to execute quickly.
*/
@ThreadSafe
public interface ClientTransport {
public interface ClientTransport extends InternalInstrumented<InternalTransportStats> {
/**
* Creates a new stream for sending messages to a remote end-point.
@ -64,12 +64,6 @@ public interface ClientTransport {
*/
void ping(PingCallback callback, Executor executor);
/**
* Returns a Future representing the transport level stats. If this transport does not support
* stats, the return value will be a Future of a null value.
*/
Future<InternalTransportStats> getTransportStats();
/**
* A callback that is invoked when the acknowledgement to a {@link #ping} is received. Exactly one
* of the two methods should be called per {@link #ping}.

View File

@ -17,6 +17,7 @@
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.Context;
@ -34,7 +35,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@ -199,7 +199,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
SettableFuture<InternalTransportStats> ret = SettableFuture.create();
ret.set(null);
return ret;

View File

@ -18,20 +18,20 @@ package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.InternalLogId;
import io.grpc.InternalTransportStats;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
/**
* A client transport that creates streams that will immediately fail when started.
*/
class FailingClientTransport implements ClientTransport {
@VisibleForTesting
final Status error;
@ -56,9 +56,14 @@ class FailingClientTransport implements ClientTransport {
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
SettableFuture<InternalTransportStats> ret = SettableFuture.create();
ret.set(null);
return ret;
}
@Override
public InternalLogId getLogId() {
throw new UnsupportedOperationException("Not a real transport");
}
}

View File

@ -16,7 +16,7 @@
package io.grpc.internal;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.InternalLogId;
@ -25,7 +25,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
abstract class ForwardingConnectionClientTransport implements ConnectionClientTransport {
@Override
@ -70,10 +69,8 @@ abstract class ForwardingConnectionClientTransport implements ConnectionClientTr
}
@Override
public Future<InternalTransportStats> getTransportStats() {
SettableFuture<InternalTransportStats> ret = SettableFuture.create();
ret.set(null);
return ret;
public ListenableFuture<InternalTransportStats> getStats() {
return delegate().getStats();
}
protected abstract ConnectionClientTransport delegate();

View File

@ -24,10 +24,12 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.InternalLogId;
import io.grpc.InternalMetadata;
import io.grpc.InternalMetadata.TrustedAsciiMarshaller;
import io.grpc.InternalTransportStats;
@ -52,7 +54,6 @@ import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -681,10 +682,14 @@ public final class GrpcUtil {
transport.ping(callback, executor);
}
@Nullable
@Override
public Future<InternalTransportStats> getTransportStats() {
return transport.getTransportStats();
public InternalLogId getLogId() {
return transport.getLogId();
}
@Override
public ListenableFuture<InternalTransportStats> getStats() {
return transport.getStats();
}
};
}

View File

@ -16,14 +16,13 @@
package io.grpc.internal;
import io.grpc.InternalInstrumented;
import io.grpc.InternalTransportStats;
import io.grpc.InternalWithLogId;
import io.grpc.Status;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
/** An inbound connection. */
public interface ServerTransport extends InternalWithLogId {
public interface ServerTransport extends InternalInstrumented<InternalTransportStats> {
/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
* eventually begin failing. New streams "eventually" begin failing because shutdown may need to
@ -45,10 +44,4 @@ public interface ServerTransport extends InternalWithLogId {
* outstanding tasks are cancelled when the transport terminates.
*/
ScheduledExecutorService getScheduledExecutorService();
/**
* Returns a Future representing the transport level stats. If this transport does not support
* stats, the return value will be a Future of a null value.
*/
Future<InternalTransportStats> getTransportStats();
}

View File

@ -42,6 +42,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
@ -77,7 +78,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -1275,7 +1275,7 @@ public class ServerImplTest {
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
SettableFuture<InternalTransportStats> ret = SettableFuture.create();
ret.set(null);
return ret;

View File

@ -21,6 +21,7 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
@ -51,15 +52,15 @@ import io.netty.util.AsciiString;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* A Netty-based {@link ConnectionClientTransport} implementation.
*/
class NettyClientTransport implements ConnectionClientTransport {
private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName());
private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
private final Map<ChannelOption<?>, ?> channelOptions;
private final SocketAddress address;
@ -310,21 +311,22 @@ class NettyClientTransport implements ConnectionClientTransport {
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
final SettableFuture<InternalTransportStats> result = SettableFuture.create();
if (channel.eventLoop().inEventLoop()) {
// This is necessary, otherwise we will block forever if we get the future from inside
// the event loop.
SettableFuture<InternalTransportStats> result = SettableFuture.create();
result.set(transportTracer.getStats());
return result;
}
return channel.eventLoop().submit(
new Callable<InternalTransportStats>() {
channel.eventLoop().submit(
new Runnable() {
@Override
public InternalTransportStats call() throws Exception {
return transportTracer.getStats();
public void run() {
result.set(transportTracer.getStats());
}
});
return result;
}
@VisibleForTesting

View File

@ -19,6 +19,7 @@ package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InternalLogId;
import io.grpc.InternalTransportStats;
@ -33,8 +34,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -43,7 +42,6 @@ import java.util.logging.Logger;
* The Netty-based server transport.
*/
class NettyServerTransport implements ServerTransport {
@SuppressWarnings("unused") // log is for general messages, but nothing currently uses it
private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName());
// connectionLog is for connection related messages only
private static final Logger connectionLog = Logger.getLogger(
@ -177,21 +175,22 @@ class NettyServerTransport implements ServerTransport {
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
final SettableFuture<InternalTransportStats> result = SettableFuture.create();
if (channel.eventLoop().inEventLoop()) {
// This is necessary, otherwise we will block forever if we get the future from inside
// the event loop.
SettableFuture<InternalTransportStats> result = SettableFuture.create();
result.set(transportTracer.getStats());
return result;
}
return channel.eventLoop().submit(
new Callable<InternalTransportStats>() {
channel.eventLoop().submit(
new Runnable() {
@Override
public InternalTransportStats call() throws Exception {
return transportTracer.getStats();
public void run() {
result.set(transportTracer.getStats());
}
});
return result;
}
/**

View File

@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.squareup.okhttp.Credentials;
import com.squareup.okhttp.HttpUrl;
@ -70,7 +71,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -892,7 +892,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
}
@Override
public Future<InternalTransportStats> getTransportStats() {
public ListenableFuture<InternalTransportStats> getStats() {
synchronized (lock) {
SettableFuture<InternalTransportStats> ret = SettableFuture.create();
ret.set(transportTracer.getStats());

View File

@ -547,7 +547,7 @@ public class OkHttpClientTransportTest {
@Test
public void transportTracer_windowSizeDefault() throws Exception {
initTransport();
InternalTransportStats stats = clientTransport.getTransportStats().get();
InternalTransportStats stats = clientTransport.getStats().get();
assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
// okhttp does not track local window sizes
assertEquals(-1, stats.localFlowControlWindow);
@ -556,13 +556,13 @@ public class OkHttpClientTransportTest {
@Test
public void transportTracer_windowSize_remote() throws Exception {
initTransport();
InternalTransportStats before = clientTransport.getTransportStats().get();
InternalTransportStats before = clientTransport.getStats().get();
assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
// okhttp does not track local window sizes
assertEquals(-1, before.localFlowControlWindow);
frameHandler().windowUpdate(0, 1000);
InternalTransportStats after = clientTransport.getTransportStats().get();
InternalTransportStats after = clientTransport.getStats().get();
assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow);
// okhttp does not track local window sizes
assertEquals(-1, after.localFlowControlWindow);
@ -1278,11 +1278,11 @@ public class OkHttpClientTransportTest {
initTransport();
PingCallbackImpl callback1 = new PingCallbackImpl();
clientTransport.ping(callback1, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, clientTransport.getStats().get().keepAlivesSent);
// add'l ping will be added as listener to outstanding operation
PingCallbackImpl callback2 = new PingCallbackImpl();
clientTransport.ping(callback2, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, clientTransport.getStats().get().keepAlivesSent);
ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class);
ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class);
@ -1315,7 +1315,7 @@ public class OkHttpClientTransportTest {
// now that previous ping is done, next request returns a different future
callback1 = new PingCallbackImpl();
clientTransport.ping(callback1, MoreExecutors.directExecutor());
assertEquals(2, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(2, clientTransport.getStats().get().keepAlivesSent);
assertEquals(0, callback1.invocationCount);
shutdownAndVerify();
}
@ -1325,7 +1325,7 @@ public class OkHttpClientTransportTest {
initTransport();
PingCallbackImpl callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, clientTransport.getStats().get().keepAlivesSent);
assertEquals(0, callback.invocationCount);
clientTransport.shutdown(SHUTDOWN_REASON);
@ -1337,7 +1337,7 @@ public class OkHttpClientTransportTest {
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, clientTransport.getStats().get().keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
@ -1349,7 +1349,7 @@ public class OkHttpClientTransportTest {
initTransport();
PingCallbackImpl callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, clientTransport.getStats().get().keepAlivesSent);
assertEquals(0, callback.invocationCount);
clientTransport.onException(new IOException());
@ -1362,7 +1362,7 @@ public class OkHttpClientTransportTest {
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, clientTransport.getStats().get().keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,

View File

@ -1406,11 +1406,10 @@ public abstract class AbstractTransportTest {
long serverFirstTimestampNanos;
long clientFirstTimestampNanos;
{
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(0, serverBefore.streamsStarted);
assertEquals(0, serverBefore.lastStreamCreatedTimeNanos);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(0, clientBefore.streamsStarted);
assertEquals(0, clientBefore.lastStreamCreatedTimeNanos);
@ -1420,15 +1419,14 @@ public abstract class AbstractTransportTest {
StreamCreation serverStreamCreation = serverTransportListener
.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(1, serverAfter.streamsStarted);
serverFirstTimestampNanos = serverAfter.lastStreamCreatedTimeNanos;
assertEquals(
currentTimeMillis(),
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos));
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(1, clientAfter.streamsStarted);
clientFirstTimestampNanos = clientAfter.lastStreamCreatedTimeNanos;
assertEquals(
@ -1444,10 +1442,9 @@ public abstract class AbstractTransportTest {
// start second stream
{
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(1, serverBefore.streamsStarted);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(1, clientBefore.streamsStarted);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
@ -1456,8 +1453,7 @@ public abstract class AbstractTransportTest {
StreamCreation serverStreamCreation = serverTransportListener
.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(2, serverAfter.streamsStarted);
assertEquals(
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
@ -1466,7 +1462,7 @@ public abstract class AbstractTransportTest {
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos);
assertEquals(currentTimeMillis(), serverSecondTimestamp);
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(2, clientAfter.streamsStarted);
assertEquals(
TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
@ -1497,11 +1493,10 @@ public abstract class AbstractTransportTest {
return;
}
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(0, serverBefore.streamsSucceeded);
assertEquals(0, serverBefore.streamsFailed);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(0, clientBefore.streamsSucceeded);
assertEquals(0, clientBefore.streamsFailed);
@ -1512,11 +1507,10 @@ public abstract class AbstractTransportTest {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(1, serverAfter.streamsSucceeded);
assertEquals(0, serverAfter.streamsFailed);
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(1, clientAfter.streamsSucceeded);
assertEquals(0, clientAfter.streamsFailed);
}
@ -1538,11 +1532,10 @@ public abstract class AbstractTransportTest {
return;
}
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(0, serverBefore.streamsFailed);
assertEquals(0, serverBefore.streamsSucceeded);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(0, clientBefore.streamsFailed);
assertEquals(0, clientBefore.streamsSucceeded);
@ -1552,11 +1545,10 @@ public abstract class AbstractTransportTest {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(1, serverAfter.streamsFailed);
assertEquals(0, serverAfter.streamsSucceeded);
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(1, clientAfter.streamsFailed);
assertEquals(0, clientAfter.streamsSucceeded);
@ -1579,11 +1571,10 @@ public abstract class AbstractTransportTest {
return;
}
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(0, serverBefore.streamsFailed);
assertEquals(0, serverBefore.streamsSucceeded);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(0, clientBefore.streamsFailed);
assertEquals(0, clientBefore.streamsSucceeded);
@ -1591,11 +1582,10 @@ public abstract class AbstractTransportTest {
// do not validate stats until close() has been called on server
assertNotNull(serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(1, serverAfter.streamsFailed);
assertEquals(0, serverAfter.streamsSucceeded);
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(1, clientAfter.streamsFailed);
assertEquals(0, clientAfter.streamsSucceeded);
}
@ -1618,11 +1608,10 @@ public abstract class AbstractTransportTest {
return;
}
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(0, serverBefore.messagesReceived);
assertEquals(0, serverBefore.lastMessageReceivedTimeNanos);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(0, clientBefore.messagesSent);
assertEquals(0, clientBefore.lastMessageSentTimeNanos);
@ -1632,13 +1621,12 @@ public abstract class AbstractTransportTest {
clientStream.halfClose();
verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(1, serverAfter.messagesReceived);
long serverTimestamp =
TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos);
assertEquals(currentTimeMillis(), serverTimestamp);
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(1, clientAfter.messagesSent);
long clientTimestamp =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos);
@ -1664,11 +1652,10 @@ public abstract class AbstractTransportTest {
return;
}
InternalTransportStats serverBefore =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get();
assertEquals(0, serverBefore.messagesSent);
assertEquals(0, serverBefore.lastMessageSentTimeNanos);
InternalTransportStats clientBefore = client.getTransportStats().get();
InternalTransportStats clientBefore = client.getStats().get();
assertEquals(0, clientBefore.messagesReceived);
assertEquals(0, clientBefore.lastMessageReceivedTimeNanos);
@ -1678,12 +1665,11 @@ public abstract class AbstractTransportTest {
serverStream.flush();
verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
InternalTransportStats serverAfter =
serverTransportListener.transport.getTransportStats().get();
InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get();
assertEquals(1, serverAfter.messagesSent);
long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos);
assertEquals(currentTimeMillis(), serverTimestmap);
InternalTransportStats clientAfter = client.getTransportStats().get();
InternalTransportStats clientAfter = client.getStats().get();
assertEquals(1, clientAfter.messagesReceived);
long clientTimestmap =
TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos);