From f1151f91b9cd901e44f5c6038cd4bf53c67c992e Mon Sep 17 00:00:00 2001 From: zpencer Date: Wed, 13 Dec 2017 12:56:25 -0800 Subject: [PATCH] core: InternalInstrumented for instrumented classes (#3817) Entities that report stats to channelz will implement this interface. --- core/src/main/java/io/grpc/Instrumented.java | 27 +++++++ .../java/io/grpc/InternalInstrumented.java | 24 +++++++ core/src/main/java/io/grpc/InternalLogId.java | 37 +++------- .../main/java/io/grpc/InternalWithLogId.java | 10 +-- core/src/main/java/io/grpc/LogId.java | 60 ++++++++++++++++ core/src/main/java/io/grpc/WithLogId.java | 31 ++++++++ .../io/grpc/inprocess/InProcessTransport.java | 4 +- .../io/grpc/internal/ClientTransport.java | 10 +-- .../grpc/internal/DelayedClientTransport.java | 4 +- .../grpc/internal/FailingClientTransport.java | 11 ++- .../ForwardingConnectionClientTransport.java | 9 +-- .../main/java/io/grpc/internal/GrpcUtil.java | 13 ++-- .../io/grpc/internal/ServerTransport.java | 11 +-- .../java/io/grpc/internal/ServerImplTest.java | 4 +- .../io/grpc/netty/NettyClientTransport.java | 18 ++--- .../io/grpc/netty/NettyServerTransport.java | 17 +++-- .../io/grpc/okhttp/OkHttpClientTransport.java | 4 +- .../okhttp/OkHttpClientTransportTest.java | 20 +++--- .../testing/AbstractTransportTest.java | 70 ++++++++----------- 19 files changed, 240 insertions(+), 144 deletions(-) create mode 100644 core/src/main/java/io/grpc/Instrumented.java create mode 100644 core/src/main/java/io/grpc/InternalInstrumented.java create mode 100644 core/src/main/java/io/grpc/LogId.java create mode 100644 core/src/main/java/io/grpc/WithLogId.java diff --git a/core/src/main/java/io/grpc/Instrumented.java b/core/src/main/java/io/grpc/Instrumented.java new file mode 100644 index 0000000000..529cfca433 --- /dev/null +++ b/core/src/main/java/io/grpc/Instrumented.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * An interface for types that may support instrumentation. If the actual type does not + * support instrumentation, then the future will return a {@code null}. + */ +interface Instrumented extends WithLogId { + ListenableFuture getStats(); +} diff --git a/core/src/main/java/io/grpc/InternalInstrumented.java b/core/src/main/java/io/grpc/InternalInstrumented.java new file mode 100644 index 0000000000..d1d24d2227 --- /dev/null +++ b/core/src/main/java/io/grpc/InternalInstrumented.java @@ -0,0 +1,24 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * This is an gRPC internal interface. Do not use this. + */ +@Internal +public interface InternalInstrumented extends Instrumented, InternalWithLogId { +} diff --git a/core/src/main/java/io/grpc/InternalLogId.java b/core/src/main/java/io/grpc/InternalLogId.java index 7196b2cd44..d80f5fb21e 100644 --- a/core/src/main/java/io/grpc/InternalLogId.java +++ b/core/src/main/java/io/grpc/InternalLogId.java @@ -16,43 +16,22 @@ package io.grpc; -import java.util.concurrent.atomic.AtomicLong; - /** - * Do not use this. - * - *

A loggable ID, unique for the duration of the program. + * Do not use this. This is an internal accessor class. */ @Internal -public final class InternalLogId { - private static final AtomicLong idAlloc = new AtomicLong(); +public final class InternalLogId extends LogId { + private InternalLogId(String tag, long id) { + super(tag, id); + } /** + * An accessor method for {@link LogId#allocate(String)}. + * * @param tag a loggable tag associated with this tag. The ID that is allocated is guaranteed * to be unique and increasing, irrespective of the tag. */ public static InternalLogId allocate(String tag) { - return new InternalLogId(tag, idAlloc.incrementAndGet()); - } - - private final String tag; - private final long id; - - private InternalLogId(String tag, long id) { - this.tag = tag; - this.id = id; - } - - public long getId() { - return id; - } - - public String getTag() { - return tag; - } - - @Override - public String toString() { - return tag + "-" + id; + return new InternalLogId(tag, LogId.getNextId()); } } diff --git a/core/src/main/java/io/grpc/InternalWithLogId.java b/core/src/main/java/io/grpc/InternalWithLogId.java index 486d37a881..3bdb2cf193 100644 --- a/core/src/main/java/io/grpc/InternalWithLogId.java +++ b/core/src/main/java/io/grpc/InternalWithLogId.java @@ -22,13 +22,7 @@ package io.grpc; *

An object that has an ID that is unique within the JVM, primarily for debug logging. */ @Internal -public interface InternalWithLogId { - /** - * Returns an ID that is primarily used in debug logs. It usually contains the class name and a - * numeric ID that is unique among the instances. - * - *

The subclasses of this interface usually want to include the log ID in their {@link - * #toString} results. - */ +public interface InternalWithLogId extends WithLogId { + @Override InternalLogId getLogId(); } diff --git a/core/src/main/java/io/grpc/LogId.java b/core/src/main/java/io/grpc/LogId.java new file mode 100644 index 0000000000..a7985f6b21 --- /dev/null +++ b/core/src/main/java/io/grpc/LogId.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A loggable ID, unique for the duration of the program. + */ +// not final so that InternalLogId can make this class visible outside of io.grpc +class LogId { + private static final AtomicLong idAlloc = new AtomicLong(); + + /** + * @param tag a loggable tag associated with this tag. The ID that is allocated is guaranteed + * to be unique and increasing, irrespective of the tag. + */ + public static LogId allocate(String tag) { + return new LogId(tag, getNextId()); + } + + static long getNextId() { + return idAlloc.incrementAndGet(); + } + + private final String tag; + private final long id; + + protected LogId(String tag, long id) { + this.tag = tag; + this.id = id; + } + + public long getId() { + return id; + } + + public String getTag() { + return tag; + } + + @Override + public String toString() { + return tag + "-" + id; + } +} diff --git a/core/src/main/java/io/grpc/WithLogId.java b/core/src/main/java/io/grpc/WithLogId.java new file mode 100644 index 0000000000..adf0cfedcf --- /dev/null +++ b/core/src/main/java/io/grpc/WithLogId.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * An object that has an ID that is unique within the JVM, primarily for debug logging. + */ +interface WithLogId { + /** + * Returns an ID that is primarily used in debug logs. It usually contains the class name and a + * numeric ID that is unique among the instances. + * + *

The subclasses of this interface usually want to include the log ID in their {@link + * #toString} results. + */ + LogId getLogId(); +} diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index cc9aca2737..d6aa2143ab 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -18,6 +18,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; @@ -51,7 +52,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -225,7 +225,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { // TODO(zpencer): add transport tracing to in-process server SettableFuture ret = SettableFuture.create(); ret.set(null); diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java index ccf9c7d253..17b01907c8 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ClientTransport.java @@ -17,11 +17,11 @@ package io.grpc.internal; import io.grpc.CallOptions; +import io.grpc.InternalInstrumented; import io.grpc.InternalTransportStats; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import javax.annotation.concurrent.ThreadSafe; /** @@ -31,7 +31,7 @@ import javax.annotation.concurrent.ThreadSafe; * are expected to execute quickly. */ @ThreadSafe -public interface ClientTransport { +public interface ClientTransport extends InternalInstrumented { /** * Creates a new stream for sending messages to a remote end-point. @@ -64,12 +64,6 @@ public interface ClientTransport { */ void ping(PingCallback callback, Executor executor); - /** - * Returns a Future representing the transport level stats. If this transport does not support - * stats, the return value will be a Future of a null value. - */ - Future getTransportStats(); - /** * A callback that is invoked when the acknowledgement to a {@link #ping} is received. Exactly one * of the two methods should be called per {@link #ping}. diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 819966b72a..c3d1aeee78 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -17,6 +17,7 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallOptions; import io.grpc.Context; @@ -34,7 +35,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -199,7 +199,7 @@ final class DelayedClientTransport implements ManagedClientTransport { } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { SettableFuture ret = SettableFuture.create(); ret.set(null); return ret; diff --git a/core/src/main/java/io/grpc/internal/FailingClientTransport.java b/core/src/main/java/io/grpc/internal/FailingClientTransport.java index 984ad767eb..2441a6f985 100644 --- a/core/src/main/java/io/grpc/internal/FailingClientTransport.java +++ b/core/src/main/java/io/grpc/internal/FailingClientTransport.java @@ -18,20 +18,20 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallOptions; +import io.grpc.InternalLogId; import io.grpc.InternalTransportStats; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import java.util.concurrent.Executor; -import java.util.concurrent.Future; /** * A client transport that creates streams that will immediately fail when started. */ class FailingClientTransport implements ClientTransport { - @VisibleForTesting final Status error; @@ -56,9 +56,14 @@ class FailingClientTransport implements ClientTransport { } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { SettableFuture ret = SettableFuture.create(); ret.set(null); return ret; } + + @Override + public InternalLogId getLogId() { + throw new UnsupportedOperationException("Not a real transport"); + } } diff --git a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java index 8a3bbb066d..1171b5717b 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java @@ -16,7 +16,7 @@ package io.grpc.internal; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ListenableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.InternalLogId; @@ -25,7 +25,6 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import java.util.concurrent.Executor; -import java.util.concurrent.Future; abstract class ForwardingConnectionClientTransport implements ConnectionClientTransport { @Override @@ -70,10 +69,8 @@ abstract class ForwardingConnectionClientTransport implements ConnectionClientTr } @Override - public Future getTransportStats() { - SettableFuture ret = SettableFuture.create(); - ret.set(null); - return ret; + public ListenableFuture getStats() { + return delegate().getStats(); } protected abstract ConnectionClientTransport delegate(); diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 406802f14c..266c749019 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -24,10 +24,12 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; +import io.grpc.InternalLogId; import io.grpc.InternalMetadata; import io.grpc.InternalMetadata.TrustedAsciiMarshaller; import io.grpc.InternalTransportStats; @@ -52,7 +54,6 @@ import java.util.Collection; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -681,10 +682,14 @@ public final class GrpcUtil { transport.ping(callback, executor); } - @Nullable @Override - public Future getTransportStats() { - return transport.getTransportStats(); + public InternalLogId getLogId() { + return transport.getLogId(); + } + + @Override + public ListenableFuture getStats() { + return transport.getStats(); } }; } diff --git a/core/src/main/java/io/grpc/internal/ServerTransport.java b/core/src/main/java/io/grpc/internal/ServerTransport.java index 1826228d0c..ff4e6bb91b 100644 --- a/core/src/main/java/io/grpc/internal/ServerTransport.java +++ b/core/src/main/java/io/grpc/internal/ServerTransport.java @@ -16,14 +16,13 @@ package io.grpc.internal; +import io.grpc.InternalInstrumented; import io.grpc.InternalTransportStats; -import io.grpc.InternalWithLogId; import io.grpc.Status; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; /** An inbound connection. */ -public interface ServerTransport extends InternalWithLogId { +public interface ServerTransport extends InternalInstrumented { /** * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will * eventually begin failing. New streams "eventually" begin failing because shutdown may need to @@ -45,10 +44,4 @@ public interface ServerTransport extends InternalWithLogId { * outstanding tasks are cancelled when the transport terminates. */ ScheduledExecutorService getScheduledExecutorService(); - - /** - * Returns a Future representing the transport level stats. If this transport does not support - * stats, the return value will be a Future of a null value. - */ - Future getTransportStats(); } diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 30f3b411a1..d5429c70ee 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -42,6 +42,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.truth.Truth; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; @@ -77,7 +78,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -1275,7 +1275,7 @@ public class ServerImplTest { } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { SettableFuture ret = SettableFuture.create(); ret.set(null); return ret; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index a5ae55064a..d595de6972 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -21,6 +21,7 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; @@ -51,15 +52,15 @@ import io.netty.util.AsciiString; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.Future; +import java.util.logging.Logger; import javax.annotation.Nullable; /** * A Netty-based {@link ConnectionClientTransport} implementation. */ class NettyClientTransport implements ConnectionClientTransport { + private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName()); private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); private final Map, ?> channelOptions; private final SocketAddress address; @@ -310,21 +311,22 @@ class NettyClientTransport implements ConnectionClientTransport { } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { + final SettableFuture result = SettableFuture.create(); if (channel.eventLoop().inEventLoop()) { // This is necessary, otherwise we will block forever if we get the future from inside // the event loop. - SettableFuture result = SettableFuture.create(); result.set(transportTracer.getStats()); return result; } - return channel.eventLoop().submit( - new Callable() { + channel.eventLoop().submit( + new Runnable() { @Override - public InternalTransportStats call() throws Exception { - return transportTracer.getStats(); + public void run() { + result.set(transportTracer.getStats()); } }); + return result; } @VisibleForTesting diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 1ddc6bebc5..8a16fb93fb 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -19,6 +19,7 @@ package io.grpc.netty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.InternalLogId; import io.grpc.InternalTransportStats; @@ -33,8 +34,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import java.io.IOException; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -43,7 +42,6 @@ import java.util.logging.Logger; * The Netty-based server transport. */ class NettyServerTransport implements ServerTransport { - @SuppressWarnings("unused") // log is for general messages, but nothing currently uses it private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName()); // connectionLog is for connection related messages only private static final Logger connectionLog = Logger.getLogger( @@ -177,21 +175,22 @@ class NettyServerTransport implements ServerTransport { } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { + final SettableFuture result = SettableFuture.create(); if (channel.eventLoop().inEventLoop()) { // This is necessary, otherwise we will block forever if we get the future from inside // the event loop. - SettableFuture result = SettableFuture.create(); result.set(transportTracer.getStats()); return result; } - return channel.eventLoop().submit( - new Callable() { + channel.eventLoop().submit( + new Runnable() { @Override - public InternalTransportStats call() throws Exception { - return transportTracer.getStats(); + public void run() { + result.set(transportTracer.getStats()); } }); + return result; } /** diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 2e4e6ad504..717c06ac78 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.squareup.okhttp.Credentials; import com.squareup.okhttp.HttpUrl; @@ -70,7 +71,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -892,7 +892,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { } @Override - public Future getTransportStats() { + public ListenableFuture getStats() { synchronized (lock) { SettableFuture ret = SettableFuture.create(); ret.set(transportTracer.getStats()); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 2ecfc774eb..03708a5d18 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -547,7 +547,7 @@ public class OkHttpClientTransportTest { @Test public void transportTracer_windowSizeDefault() throws Exception { initTransport(); - InternalTransportStats stats = clientTransport.getTransportStats().get(); + InternalTransportStats stats = clientTransport.getStats().get(); assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); // okhttp does not track local window sizes assertEquals(-1, stats.localFlowControlWindow); @@ -556,13 +556,13 @@ public class OkHttpClientTransportTest { @Test public void transportTracer_windowSize_remote() throws Exception { initTransport(); - InternalTransportStats before = clientTransport.getTransportStats().get(); + InternalTransportStats before = clientTransport.getStats().get(); assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); // okhttp does not track local window sizes assertEquals(-1, before.localFlowControlWindow); frameHandler().windowUpdate(0, 1000); - InternalTransportStats after = clientTransport.getTransportStats().get(); + InternalTransportStats after = clientTransport.getStats().get(); assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow); // okhttp does not track local window sizes assertEquals(-1, after.localFlowControlWindow); @@ -1278,11 +1278,11 @@ public class OkHttpClientTransportTest { initTransport(); PingCallbackImpl callback1 = new PingCallbackImpl(); clientTransport.ping(callback1, MoreExecutors.directExecutor()); - assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(1, clientTransport.getStats().get().keepAlivesSent); // add'l ping will be added as listener to outstanding operation PingCallbackImpl callback2 = new PingCallbackImpl(); clientTransport.ping(callback2, MoreExecutors.directExecutor()); - assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(1, clientTransport.getStats().get().keepAlivesSent); ArgumentCaptor captor1 = ArgumentCaptor.forClass(int.class); ArgumentCaptor captor2 = ArgumentCaptor.forClass(int.class); @@ -1315,7 +1315,7 @@ public class OkHttpClientTransportTest { // now that previous ping is done, next request returns a different future callback1 = new PingCallbackImpl(); clientTransport.ping(callback1, MoreExecutors.directExecutor()); - assertEquals(2, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(2, clientTransport.getStats().get().keepAlivesSent); assertEquals(0, callback1.invocationCount); shutdownAndVerify(); } @@ -1325,7 +1325,7 @@ public class OkHttpClientTransportTest { initTransport(); PingCallbackImpl callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); - assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(1, clientTransport.getStats().get().keepAlivesSent); assertEquals(0, callback.invocationCount); clientTransport.shutdown(SHUTDOWN_REASON); @@ -1337,7 +1337,7 @@ public class OkHttpClientTransportTest { // now that handler is in terminal state, all future pings fail immediately callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); - assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(1, clientTransport.getStats().get().keepAlivesSent); assertEquals(1, callback.invocationCount); assertTrue(callback.failureCause instanceof StatusException); assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); @@ -1349,7 +1349,7 @@ public class OkHttpClientTransportTest { initTransport(); PingCallbackImpl callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); - assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(1, clientTransport.getStats().get().keepAlivesSent); assertEquals(0, callback.invocationCount); clientTransport.onException(new IOException()); @@ -1362,7 +1362,7 @@ public class OkHttpClientTransportTest { // now that handler is in terminal state, all future pings fail immediately callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); - assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent); + assertEquals(1, clientTransport.getStats().get().keepAlivesSent); assertEquals(1, callback.invocationCount); assertTrue(callback.failureCause instanceof StatusException); assertEquals(Status.Code.UNAVAILABLE, diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index ed32002c32..be636033ed 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -1406,11 +1406,10 @@ public abstract class AbstractTransportTest { long serverFirstTimestampNanos; long clientFirstTimestampNanos; { - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(0, serverBefore.streamsStarted); assertEquals(0, serverBefore.lastStreamCreatedTimeNanos); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(0, clientBefore.streamsStarted); assertEquals(0, clientBefore.lastStreamCreatedTimeNanos); @@ -1420,15 +1419,14 @@ public abstract class AbstractTransportTest { StreamCreation serverStreamCreation = serverTransportListener .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(1, serverAfter.streamsStarted); serverFirstTimestampNanos = serverAfter.lastStreamCreatedTimeNanos; assertEquals( currentTimeMillis(), TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos)); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(1, clientAfter.streamsStarted); clientFirstTimestampNanos = clientAfter.lastStreamCreatedTimeNanos; assertEquals( @@ -1444,10 +1442,9 @@ public abstract class AbstractTransportTest { // start second stream { - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(1, serverBefore.streamsStarted); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(1, clientBefore.streamsStarted); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); @@ -1456,8 +1453,7 @@ public abstract class AbstractTransportTest { StreamCreation serverStreamCreation = serverTransportListener .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(2, serverAfter.streamsStarted); assertEquals( TimeUnit.MILLISECONDS.toNanos(elapsedMillis), @@ -1466,7 +1462,7 @@ public abstract class AbstractTransportTest { TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos); assertEquals(currentTimeMillis(), serverSecondTimestamp); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(2, clientAfter.streamsStarted); assertEquals( TimeUnit.MILLISECONDS.toNanos(elapsedMillis), @@ -1497,11 +1493,10 @@ public abstract class AbstractTransportTest { return; } - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(0, serverBefore.streamsSucceeded); assertEquals(0, serverBefore.streamsFailed); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(0, clientBefore.streamsSucceeded); assertEquals(0, clientBefore.streamsFailed); @@ -1512,11 +1507,10 @@ public abstract class AbstractTransportTest { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(1, serverAfter.streamsSucceeded); assertEquals(0, serverAfter.streamsFailed); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(1, clientAfter.streamsSucceeded); assertEquals(0, clientAfter.streamsFailed); } @@ -1538,11 +1532,10 @@ public abstract class AbstractTransportTest { return; } - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(0, serverBefore.streamsFailed); assertEquals(0, serverBefore.streamsSucceeded); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(0, clientBefore.streamsFailed); assertEquals(0, clientBefore.streamsSucceeded); @@ -1552,11 +1545,10 @@ public abstract class AbstractTransportTest { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(1, serverAfter.streamsFailed); assertEquals(0, serverAfter.streamsSucceeded); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(1, clientAfter.streamsFailed); assertEquals(0, clientAfter.streamsSucceeded); @@ -1579,11 +1571,10 @@ public abstract class AbstractTransportTest { return; } - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(0, serverBefore.streamsFailed); assertEquals(0, serverBefore.streamsSucceeded); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(0, clientBefore.streamsFailed); assertEquals(0, clientBefore.streamsSucceeded); @@ -1591,11 +1582,10 @@ public abstract class AbstractTransportTest { // do not validate stats until close() has been called on server assertNotNull(serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(1, serverAfter.streamsFailed); assertEquals(0, serverAfter.streamsSucceeded); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(1, clientAfter.streamsFailed); assertEquals(0, clientAfter.streamsSucceeded); } @@ -1618,11 +1608,10 @@ public abstract class AbstractTransportTest { return; } - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(0, serverBefore.messagesReceived); assertEquals(0, serverBefore.lastMessageReceivedTimeNanos); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(0, clientBefore.messagesSent); assertEquals(0, clientBefore.lastMessageSentTimeNanos); @@ -1632,13 +1621,12 @@ public abstract class AbstractTransportTest { clientStream.halfClose(); verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(1, serverAfter.messagesReceived); long serverTimestamp = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos); assertEquals(currentTimeMillis(), serverTimestamp); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(1, clientAfter.messagesSent); long clientTimestamp = TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos); @@ -1664,11 +1652,10 @@ public abstract class AbstractTransportTest { return; } - InternalTransportStats serverBefore = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverBefore = serverTransportListener.transport.getStats().get(); assertEquals(0, serverBefore.messagesSent); assertEquals(0, serverBefore.lastMessageSentTimeNanos); - InternalTransportStats clientBefore = client.getTransportStats().get(); + InternalTransportStats clientBefore = client.getStats().get(); assertEquals(0, clientBefore.messagesReceived); assertEquals(0, clientBefore.lastMessageReceivedTimeNanos); @@ -1678,12 +1665,11 @@ public abstract class AbstractTransportTest { serverStream.flush(); verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); - InternalTransportStats serverAfter = - serverTransportListener.transport.getTransportStats().get(); + InternalTransportStats serverAfter = serverTransportListener.transport.getStats().get(); assertEquals(1, serverAfter.messagesSent); long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos); assertEquals(currentTimeMillis(), serverTimestmap); - InternalTransportStats clientAfter = client.getTransportStats().get(); + InternalTransportStats clientAfter = client.getStats().get(); assertEquals(1, clientAfter.messagesReceived); long clientTimestmap = TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos);