From 7c37f1d32b9dc11027ff1ba946303f5a11dbfe00 Mon Sep 17 00:00:00 2001 From: zpencer Date: Fri, 30 Mar 2018 08:48:37 -0700 Subject: [PATCH] core,netty,okhttp,services: expose socket options to channelz (#4228) For okhttp, expose the standard options from the Socket object. For netty, expose all the `io.netty.channel.ChannelOption`s of the `channel.config()`. --- .../main/java/io/grpc/internal/Channelz.java | 61 ++++++++ .../inprocess/InProcessTransportTest.java | 2 +- .../io/grpc/netty/NettyClientTransport.java | 2 + .../io/grpc/netty/NettyServerTransport.java | 2 + netty/src/main/java/io/grpc/netty/Utils.java | 37 +++++ .../test/java/io/grpc/netty/UtilsTest.java | 49 +++++++ .../io/grpc/okhttp/OkHttpClientTransport.java | 1 + .../src/main/java/io/grpc/okhttp/Utils.java | 79 +++++++++++ .../test/java/io/grpc/okhttp/UtilsTest.java | 27 ++++ .../io/grpc/services/ChannelzProtoUtil.java | 70 ++++++++- .../grpc/services/ChannelzProtoUtilTest.java | 133 +++++++++++++++++- .../io/grpc/services/ChannelzTestHelper.java | 11 +- .../testing/AbstractTransportTest.java | 10 +- 13 files changed, 471 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/Channelz.java b/core/src/main/java/io/grpc/internal/Channelz.java index 3c3499df8f..9b4a15a629 100644 --- a/core/src/main/java/io/grpc/internal/Channelz.java +++ b/core/src/main/java/io/grpc/internal/Channelz.java @@ -22,6 +22,7 @@ import io.grpc.ConnectivityState; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -437,20 +438,80 @@ public final class Channelz { public final SocketAddress local; public final SocketAddress remote; public final Security security; + public final SocketOptions socketOptions; /** Creates an instance. */ public SocketStats( TransportStats data, SocketAddress local, SocketAddress remote, + SocketOptions socketOptions, Security security) { this.data = data; this.local = local; this.remote = remote; + this.socketOptions = socketOptions; this.security = security; } } + public static final class SocketOptions { + public final Map others; + // In netty, the value of a channel option may be null. + @Nullable public final Integer soTimeoutMillis; + @Nullable public final Integer lingerSeconds; + + /** Creates an instance. */ + public SocketOptions( + Integer timeoutMillis, + Integer lingerSeconds, + Map others) { + Preconditions.checkNotNull(others); + this.soTimeoutMillis = timeoutMillis; + this.lingerSeconds = lingerSeconds; + this.others = Collections.unmodifiableMap(new HashMap(others)); + } + + public static final class Builder { + private final Map others = new HashMap(); + private Integer timeoutMillis; + private Integer lingerSeconds; + + /** The value of {@link java.net.Socket#getSoTimeout()}. */ + public Builder setSocketOptionTimeoutMillis(Integer timeoutMillis) { + this.timeoutMillis = timeoutMillis; + return this; + } + + /** The value of {@link java.net.Socket#getSoLinger()}. + * Note: SO_LINGER is typically expressed in seconds. + */ + public Builder setSocketOptionLingerSeconds(Integer lingerSeconds) { + this.lingerSeconds = lingerSeconds; + return this; + } + + public Builder addOption(String name, String value) { + others.put(name, Preconditions.checkNotNull(value)); + return this; + } + + public Builder addOption(String name, int value) { + others.put(name, Integer.toString(value)); + return this; + } + + public Builder addOption(String name, boolean value) { + others.put(name, Boolean.toString(value)); + return this; + } + + public SocketOptions build() { + return new SocketOptions(timeoutMillis, lingerSeconds, others); + } + } + } + /** * A data class to represent transport stats. */ diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 6a8c07112b..1972b83af0 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -68,7 +68,7 @@ public class InProcessTransportTest extends AbstractTransportTest { @Test @Ignore @Override - public void socketStats_addresses() throws Exception { + public void socketStats() throws Exception { // test does not apply to in-process } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 87b11ddf9c..0b3c48b415 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -323,6 +323,7 @@ class NettyClientTransport implements ConnectionClientTransport { transportTracer.getStats(), channel.localAddress(), channel.remoteAddress(), + Utils.getSocketOptions(channel), new Security())); return result; } @@ -335,6 +336,7 @@ class NettyClientTransport implements ConnectionClientTransport { transportTracer.getStats(), channel.localAddress(), channel.remoteAddress(), + Utils.getSocketOptions(channel), new Security())); } }); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 73c748b1b9..fe905bb2dd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -206,6 +206,7 @@ class NettyServerTransport implements ServerTransport { transportTracer.getStats(), channel.localAddress(), channel.remoteAddress(), + Utils.getSocketOptions(channel), /*security=*/ null)); return result; } @@ -218,6 +219,7 @@ class NettyServerTransport implements ServerTransport { transportTracer.getStats(), channel.localAddress(), channel.remoteAddress(), + Utils.getSocketOptions(channel), /*security=*/ null)); } }); diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 5dda2ad809..473a4c8efd 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -19,6 +19,8 @@ package io.grpc.netty; import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY; import static io.grpc.internal.TransportFrameUtil.toHttp2Headers; import static io.grpc.internal.TransportFrameUtil.toRawSerializedHeaders; +import static io.netty.channel.ChannelOption.SO_LINGER; +import static io.netty.channel.ChannelOption.SO_TIMEOUT; import static io.netty.util.CharsetUtil.UTF_8; import com.google.common.annotations.VisibleForTesting; @@ -26,9 +28,13 @@ import com.google.common.base.Preconditions; import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.internal.Channelz; import io.grpc.internal.GrpcUtil; import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.http2.Http2Exception; @@ -38,6 +44,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -201,6 +208,36 @@ class Utils { } } + static Channelz.SocketOptions getSocketOptions(Channel channel) { + ChannelConfig config = channel.config(); + Channelz.SocketOptions.Builder b = new Channelz.SocketOptions.Builder(); + + // The API allows returning null but not sure if it can happen in practice. + // Let's be paranoid and do null checking just in case. + Integer lingerSeconds = config.getOption(SO_LINGER); + if (lingerSeconds != null) { + b.setSocketOptionLingerSeconds(lingerSeconds); + } + + Integer timeoutMillis = config.getOption(SO_TIMEOUT); + if (timeoutMillis != null) { + // in java, SO_TIMEOUT only applies to receiving + b.setSocketOptionTimeoutMillis(timeoutMillis); + } + + for (Entry, Object> opt : config.getOptions().entrySet()) { + ChannelOption key = opt.getKey(); + // Constants are pooled, so there should only be one instance of each constant + if (key.equals(SO_LINGER) || key.equals(SO_TIMEOUT)) { + continue; + } + Object value = opt.getValue(); + // zpencer: Can a netty option be null? + b.addOption(key.name(), String.valueOf(value)); + } + return b.build(); + } + private Utils() { // Prevents instantiation } diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java index ead175667f..372c451c6d 100644 --- a/netty/src/test/java/io/grpc/netty/UtilsTest.java +++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java @@ -17,14 +17,23 @@ package io.grpc.netty; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import com.google.common.base.MoreObjects; import com.google.common.truth.Truth; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.SocketOptions; import io.grpc.internal.GrpcUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.oio.OioSocketChannel; import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; @@ -118,6 +127,46 @@ public class UtilsTest { assertEquals(Utils.CONTENT_TYPE_GRPC, headers.get(GrpcUtil.CONTENT_TYPE_KEY.name())); } + @Test + public void channelOptionsTest_noLinger() { + Channel channel = new EmbeddedChannel(); + assertNull(channel.config().getOption(ChannelOption.SO_LINGER)); + Channelz.SocketOptions socketOptions = Utils.getSocketOptions(channel); + assertNull(socketOptions.lingerSeconds); + } + + @Test + public void channelOptionsTest_oio() { + Channel channel = new OioSocketChannel(); + SocketOptions socketOptions = setAndValidateGeneric(channel); + assertEquals(250, (int) socketOptions.soTimeoutMillis); + } + + @Test + public void channelOptionsTest_nio() { + Channel channel = new NioSocketChannel(); + SocketOptions socketOptions = setAndValidateGeneric(channel); + assertNull(socketOptions.soTimeoutMillis); + } + + private static Channelz.SocketOptions setAndValidateGeneric(Channel channel) { + channel.config().setOption(ChannelOption.SO_LINGER, 3); + // only applicable for OIO channels: + channel.config().setOption(ChannelOption.SO_TIMEOUT, 250); + // Test some arbitrarily chosen options with a non numeric values + channel.config().setOption(ChannelOption.SO_KEEPALIVE, true); + WriteBufferWaterMark writeBufWaterMark = new WriteBufferWaterMark(10, 20); + channel.config().setOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufWaterMark); + + Channelz.SocketOptions socketOptions = Utils.getSocketOptions(channel); + assertEquals(3, (int) socketOptions.lingerSeconds); + assertEquals("true", socketOptions.others.get("SO_KEEPALIVE")); + assertEquals( + writeBufWaterMark.toString(), + socketOptions.others.get(ChannelOption.WRITE_BUFFER_WATER_MARK.toString())); + return socketOptions; + } + private static void assertStatusEquals(Status expected, Status actual) { assertEquals(expected.getCode(), actual.getCode()); Truth.assertThat(MoreObjects.firstNonNull(actual.getDescription(), "")) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index f710399c86..feab8311f5 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -911,6 +911,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { transportTracer.getStats(), socket.getLocalSocketAddress(), socket.getRemoteSocketAddress(), + Utils.getSocketOptions(socket), new Security())); return ret; } diff --git a/okhttp/src/main/java/io/grpc/okhttp/Utils.java b/okhttp/src/main/java/io/grpc/okhttp/Utils.java index 6ab5d9a016..89c326f9fd 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/Utils.java +++ b/okhttp/src/main/java/io/grpc/okhttp/Utils.java @@ -19,16 +19,23 @@ package io.grpc.okhttp; import com.google.common.base.Preconditions; import io.grpc.InternalMetadata; import io.grpc.Metadata; +import io.grpc.internal.Channelz; import io.grpc.internal.TransportFrameUtil; import io.grpc.okhttp.internal.CipherSuite; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.Header; +import java.net.Socket; +import java.net.SocketException; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Common utility methods for OkHttp transport. */ class Utils { + private static final Logger log = Logger.getLogger(Utils.class.getName()); + static final int DEFAULT_WINDOW_SIZE = 65535; static final int CONNECTION_STREAM_ID = 0; @@ -79,6 +86,78 @@ class Utils { .build(); } + /** + * Attempts to capture all known socket options and return the results as a + * {@link Channelz.SocketOptions}. If getting a socket option threw an exception, + * log the error to the logger and report the value as an error in the response. + */ + static Channelz.SocketOptions getSocketOptions(Socket socket) { + Channelz.SocketOptions.Builder builder = new Channelz.SocketOptions.Builder(); + try { + builder.setSocketOptionLingerSeconds(socket.getSoLinger()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_LINGER", "channelz_internal_error"); + } + + try { + builder.setSocketOptionTimeoutMillis(socket.getSoTimeout()); + } catch (Exception e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_TIMEOUT", "channelz_internal_error"); + } + + try { + builder.addOption("TCP_NODELAY", socket.getTcpNoDelay()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("TCP_NODELAY", "channelz_internal_error"); + } + + try { + builder.addOption("SO_REUSEADDR", socket.getReuseAddress()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_REUSEADDR", "channelz_internal_error"); + } + + try { + builder.addOption("SO_SNDBUF", socket.getSendBufferSize()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_SNDBUF", "channelz_internal_error"); + } + + try { + builder.addOption("SO_RECVBUF", socket.getReceiveBufferSize()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_RECVBUF", "channelz_internal_error"); + } + + try { + builder.addOption("SO_KEEPALIVE", socket.getKeepAlive()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_KEEPALIVE", "channelz_internal_error"); + } + + try { + builder.addOption("SO_OOBINLINE", socket.getOOBInline()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("SO_OOBINLINE", "channelz_internal_error"); + } + + try { + builder.addOption("IP_TOS", socket.getTrafficClass()); + } catch (SocketException e) { + log.log(Level.SEVERE, "Exception caught while reading socket option", e); + builder.addOption("IP_TOS", "channelz_internal_error"); + } + return builder.build(); + } + private Utils() { // Prevents instantiation } diff --git a/okhttp/src/test/java/io/grpc/okhttp/UtilsTest.java b/okhttp/src/test/java/io/grpc/okhttp/UtilsTest.java index 35ca1ae689..157911f252 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/UtilsTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/UtilsTest.java @@ -19,9 +19,11 @@ package io.grpc.okhttp; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import io.grpc.internal.Channelz.SocketOptions; import io.grpc.okhttp.internal.CipherSuite; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.TlsVersion; +import java.net.Socket; import java.util.List; import org.junit.Rule; import org.junit.Test; @@ -69,4 +71,29 @@ public class UtilsTest { assertEquals(CipherSuite.forJavaName(squareCipherSuites.get(i).name()), cipherSuites.get(i)); } } + + @Test + public void getSocketOptions() throws Exception { + Socket socket = new Socket(); + socket.setSoLinger(true, 2); + socket.setSoTimeout(3); + socket.setTcpNoDelay(true); + socket.setReuseAddress(true); + socket.setReceiveBufferSize(4000); + socket.setSendBufferSize(5000); + socket.setKeepAlive(true); + socket.setOOBInline(true); + socket.setTrafficClass(8); // note: see javadoc for valid input values + + SocketOptions socketOptions = Utils.getSocketOptions(socket); + assertEquals(2, (int) socketOptions.lingerSeconds); + assertEquals(3, (int) socketOptions.soTimeoutMillis); + assertEquals("true", socketOptions.others.get("TCP_NODELAY")); + assertEquals("true", socketOptions.others.get("SO_REUSEADDR")); + assertEquals("4000", socketOptions.others.get("SO_RECVBUF")); + assertEquals("5000", socketOptions.others.get("SO_SNDBUF")); + assertEquals("true", socketOptions.others.get("SO_KEEPALIVE")); + assertEquals("true", socketOptions.others.get("SO_OOBINLINE")); + assertEquals("8", socketOptions.others.get("IP_TOS")); + } } diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java index c9a11ba768..ca8f582284 100644 --- a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java +++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java @@ -18,8 +18,10 @@ package io.grpc.services; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Int64Value; +import com.google.protobuf.util.Durations; import com.google.protobuf.util.Timestamps; import io.grpc.ConnectivityState; import io.grpc.Status; @@ -39,6 +41,9 @@ import io.grpc.channelz.v1.ServerData; import io.grpc.channelz.v1.ServerRef; import io.grpc.channelz.v1.Socket; import io.grpc.channelz.v1.SocketData; +import io.grpc.channelz.v1.SocketOption; +import io.grpc.channelz.v1.SocketOptionLinger; +import io.grpc.channelz.v1.SocketOptionTimeout; import io.grpc.channelz.v1.SocketRef; import io.grpc.channelz.v1.Subchannel; import io.grpc.channelz.v1.SubchannelRef; @@ -54,6 +59,9 @@ import io.grpc.internal.Instrumented; import io.grpc.internal.WithLogId; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; /** @@ -121,7 +129,7 @@ final class ChannelzProtoUtil { .setRef(toSocketRef(obj)) .setRemote(toAddress(socketStats.remote)) .setLocal(toAddress(socketStats.local)) - .setData(toSocketData(socketStats.data)) + .setData(extractSocketData(socketStats)) .build(); } @@ -147,7 +155,8 @@ final class ChannelzProtoUtil { return builder.build(); } - static SocketData toSocketData(TransportStats s) { + static SocketData extractSocketData(SocketStats socketStats) { + TransportStats s = socketStats.data; return SocketData .newBuilder() .setStreamsStarted(s.streamsStarted) @@ -168,9 +177,66 @@ final class ChannelzProtoUtil { Int64Value.newBuilder().setValue(s.localFlowControlWindow).build()) .setRemoteFlowControlWindow( Int64Value.newBuilder().setValue(s.remoteFlowControlWindow).build()) + .addAllOption(toSocketOptionsList(socketStats.socketOptions)) .build(); } + public static final String SO_LINGER = "SO_LINGER"; + public static final String SO_TIMEOUT = "SO_TIMEOUT"; + + static SocketOption toSocketOptionLinger(int lingerSeconds) { + final SocketOptionLinger lingerOpt; + if (lingerSeconds >= 0) { + lingerOpt = SocketOptionLinger + .newBuilder() + .setActive(true) + .setDuration(Durations.fromSeconds(lingerSeconds)) + .build(); + } else { + lingerOpt = SocketOptionLinger.getDefaultInstance(); + } + return SocketOption + .newBuilder() + .setName(SO_LINGER) + .setAdditional(Any.pack(lingerOpt)) + .build(); + } + + static SocketOption toSocketOptionTimeout(String name, int timeoutMillis) { + Preconditions.checkNotNull(name); + return SocketOption + .newBuilder() + .setName(name) + .setAdditional( + Any.pack( + SocketOptionTimeout + .newBuilder() + .setDuration(Durations.fromMillis(timeoutMillis)) + .build())) + .build(); + } + + static SocketOption toSocketOptionAdditional(String name, String value) { + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(value); + return SocketOption.newBuilder().setName(name).setValue(value).build(); + } + + static List toSocketOptionsList(Channelz.SocketOptions options) { + Preconditions.checkNotNull(options); + List ret = new ArrayList(); + if (options.lingerSeconds != null) { + ret.add(toSocketOptionLinger(options.lingerSeconds)); + } + if (options.soTimeoutMillis != null) { + ret.add(toSocketOptionTimeout(SO_TIMEOUT, options.soTimeoutMillis)); + } + for (Entry entry : options.others.entrySet()) { + ret.add(toSocketOptionAdditional(entry.getKey(), entry.getValue())); + } + return ret; + } + static Channel toChannel(Instrumented channel) { ChannelStats stats = getFuture(channel.getStats()); Channel.Builder channelBuilder = Channel diff --git a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java index 1ce474b8ca..83e3c6dadc 100644 --- a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java +++ b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java @@ -16,11 +16,14 @@ package io.grpc.services; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Int64Value; +import com.google.protobuf.util.Durations; import com.google.protobuf.util.Timestamps; import io.grpc.ConnectivityState; import io.grpc.channelz.v1.Address; @@ -39,14 +42,19 @@ import io.grpc.channelz.v1.ServerData; import io.grpc.channelz.v1.ServerRef; import io.grpc.channelz.v1.Socket; import io.grpc.channelz.v1.SocketData; +import io.grpc.channelz.v1.SocketOption; +import io.grpc.channelz.v1.SocketOptionLinger; +import io.grpc.channelz.v1.SocketOptionTimeout; import io.grpc.channelz.v1.SocketRef; import io.grpc.channelz.v1.Subchannel; import io.grpc.channelz.v1.SubchannelRef; +import io.grpc.internal.Channelz; import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.RootChannelList; import io.grpc.internal.Channelz.ServerList; import io.grpc.internal.Channelz.ServerSocketsList; import io.grpc.internal.Channelz.ServerStats; +import io.grpc.internal.Channelz.SocketOptions; import io.grpc.internal.Instrumented; import io.grpc.internal.WithLogId; import io.grpc.services.ChannelzTestHelper.TestChannel; @@ -57,6 +65,7 @@ import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; +import java.util.Map.Entry; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -125,13 +134,47 @@ public final class ChannelzProtoUtilTest { .setData(serverData) .build(); + private final SocketOption sockOptLingerDisabled = SocketOption + .newBuilder() + .setName("SO_LINGER") + .setAdditional( + Any.pack(SocketOptionLinger.getDefaultInstance())) + .build(); + + private final SocketOption sockOptlinger10s = SocketOption + .newBuilder() + .setName("SO_LINGER") + .setAdditional( + Any.pack(SocketOptionLinger + .newBuilder() + .setActive(true) + .setDuration(Durations.fromSeconds(10)) + .build())) + .build(); + + private final SocketOption sockOptTimeout200ms = SocketOption + .newBuilder() + .setName("SO_TIMEOUT") + .setAdditional( + Any.pack(SocketOptionTimeout + .newBuilder() + .setDuration(Durations.fromMillis(200)) + .build()) + ).build(); + + private final SocketOption sockOptAdditional = SocketOption + .newBuilder() + .setName("SO_MADE_UP_OPTION") + .setValue("some-made-up-value") + .build(); + private final TestSocket socket = new TestSocket(); private final SocketRef socketRef = SocketRef .newBuilder() .setName(socket.toString()) .setSocketId(socket.getLogId().getId()) .build(); - private final SocketData socketData = SocketData + private final SocketData socketDataNoSockOpts = SocketData .newBuilder() .setStreamsStarted(1) .setLastLocalStreamCreatedTimestamp(Timestamps.fromNanos(2)) @@ -203,16 +246,28 @@ public final class ChannelzProtoUtilTest { .setRef(socketRef) .setLocal(localAddress) .setRemote(remoteAddress) - .setData(socketData) + .setData(socketDataNoSockOpts) .build(), ChannelzProtoUtil.toSocket(socket)); } @Test - public void toSocketData() { + public void extractSocketData() throws Exception { + // no options assertEquals( - socketData, - ChannelzProtoUtil.toSocketData(socket.transportStats)); + socketDataNoSockOpts, + ChannelzProtoUtil.extractSocketData(socket.getStats().get())); + + // with options + socket.socketOptions = toBuilder(socket.socketOptions) + .setSocketOptionLingerSeconds(10) + .build(); + assertEquals( + socketDataNoSockOpts + .toBuilder() + .addOption(sockOptlinger10s) + .build(), + ChannelzProtoUtil.extractSocketData(socket.getStats().get())); } @Test @@ -482,6 +537,64 @@ public final class ChannelzProtoUtilTest { new ServerSocketsList(ImmutableList.of(socket, socket2), true))); } + @Test + public void toSocketOptionLinger() { + assertEquals(sockOptLingerDisabled, ChannelzProtoUtil.toSocketOptionLinger(-1)); + assertEquals(sockOptlinger10s, ChannelzProtoUtil.toSocketOptionLinger(10)); + } + + @Test + public void toSocketOptionTimeout() { + assertEquals( + sockOptTimeout200ms, ChannelzProtoUtil.toSocketOptionTimeout("SO_TIMEOUT", 200)); + } + + @Test + public void toSocketOptionAdditional() { + assertEquals( + sockOptAdditional, + ChannelzProtoUtil.toSocketOptionAdditional("SO_MADE_UP_OPTION", "some-made-up-value")); + } + + @Test + public void toSocketOptionsList() { + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new Channelz.SocketOptions.Builder().build())) + .isEmpty(); + + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new Channelz.SocketOptions.Builder().setSocketOptionLingerSeconds(10).build())) + .containsExactly(sockOptlinger10s); + + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new Channelz.SocketOptions.Builder().setSocketOptionTimeoutMillis(200).build())) + .containsExactly(sockOptTimeout200ms); + + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new Channelz.SocketOptions + .Builder() + .addOption("SO_MADE_UP_OPTION", "some-made-up-value") + .build())) + .containsExactly(sockOptAdditional); + + SocketOption otherOption = SocketOption + .newBuilder() + .setName("SO_MADE_UP_OPTION2") + .setValue("some-made-up-value2") + .build(); + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new Channelz.SocketOptions.Builder() + .addOption("SO_MADE_UP_OPTION", "some-made-up-value") + .addOption("SO_MADE_UP_OPTION2", "some-made-up-value2") + .build())) + .containsExactly(sockOptAdditional, otherOption); + } + private static ChannelStats.Builder toBuilder(ChannelStats stats) { ChannelStats.Builder builder = new ChannelStats.Builder() .setTarget(stats.target) @@ -498,4 +611,14 @@ public final class ChannelzProtoUtilTest { } return builder; } + + private static SocketOptions.Builder toBuilder(SocketOptions options) { + SocketOptions.Builder builder = new SocketOptions.Builder() + .setSocketOptionTimeoutMillis(options.soTimeoutMillis) + .setSocketOptionLingerSeconds(options.lingerSeconds); + for (Entry entry : options.others.entrySet()) { + builder.addOption(entry.getKey(), entry.getValue()); + } + return builder; + } } diff --git a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java index ee56975921..baadc10f34 100644 --- a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java +++ b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java @@ -19,6 +19,7 @@ package io.grpc.services; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.ConnectivityState; +import io.grpc.internal.Channelz; import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.Security; import io.grpc.internal.Channelz.ServerStats; @@ -54,12 +55,18 @@ final class ChannelzTestHelper { /*remoteFlowControlWindow=*/ 12); SocketAddress local = new InetSocketAddress("10.0.0.1", 1000); SocketAddress remote = new InetSocketAddress("10.0.0.2", 1000); - + Channelz.SocketOptions socketOptions = new Channelz.SocketOptions.Builder().build(); @Override public ListenableFuture getStats() { SettableFuture ret = SettableFuture.create(); - ret.set(new SocketStats(transportStats, local, remote, new Security())); + ret.set( + new SocketStats( + transportStats, + local, + remote, + socketOptions, + new Security())); return ret; } diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 778475ac21..d69d2d527a 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -80,7 +80,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -1693,9 +1692,8 @@ public abstract class AbstractTransportTest { serverStream.close(Status.OK, new Metadata()); } - @Ignore("hardcoding 127.0.0.1 fails in ipv6") @Test - public void socketStats_addresses() throws Exception { + public void socketStats() throws Exception { server.start(serverListener); ManagedClientTransport client = newClientTransport(server); startTransport(client, mockClientTransportListener); @@ -1715,10 +1713,16 @@ public abstract class AbstractTransportTest { SocketStats clientSocketStats = client.getStats().get(); assertEquals(clientAddress, clientSocketStats.local); assertEquals(serverAddress, clientSocketStats.remote); + // very basic sanity check that socket options are populated + assertNotNull(clientSocketStats.socketOptions.lingerSeconds); + assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF")); SocketStats serverSocketStats = serverTransportListener.transport.getStats().get(); assertEquals(serverAddress, serverSocketStats.local); assertEquals(clientAddress, serverSocketStats.remote); + // very basic sanity check that socket options are populated + assertNotNull(serverSocketStats.socketOptions.lingerSeconds); + assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF")); } /**