mirror of https://github.com/grpc/grpc-java.git
core,netty,okhttp,services: expose socket options to channelz (#4228)
For okhttp, expose the standard options from the Socket object. For netty, expose all the `io.netty.channel.ChannelOption`s of the `channel.config()`.
This commit is contained in:
parent
2a95e38257
commit
7c37f1d32b
|
|
@ -22,6 +22,7 @@ import io.grpc.ConnectivityState;
|
|||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -437,20 +438,80 @@ public final class Channelz {
|
|||
public final SocketAddress local;
|
||||
public final SocketAddress remote;
|
||||
public final Security security;
|
||||
public final SocketOptions socketOptions;
|
||||
|
||||
/** Creates an instance. */
|
||||
public SocketStats(
|
||||
TransportStats data,
|
||||
SocketAddress local,
|
||||
SocketAddress remote,
|
||||
SocketOptions socketOptions,
|
||||
Security security) {
|
||||
this.data = data;
|
||||
this.local = local;
|
||||
this.remote = remote;
|
||||
this.socketOptions = socketOptions;
|
||||
this.security = security;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class SocketOptions {
|
||||
public final Map<String, String> others;
|
||||
// In netty, the value of a channel option may be null.
|
||||
@Nullable public final Integer soTimeoutMillis;
|
||||
@Nullable public final Integer lingerSeconds;
|
||||
|
||||
/** Creates an instance. */
|
||||
public SocketOptions(
|
||||
Integer timeoutMillis,
|
||||
Integer lingerSeconds,
|
||||
Map<String, String> others) {
|
||||
Preconditions.checkNotNull(others);
|
||||
this.soTimeoutMillis = timeoutMillis;
|
||||
this.lingerSeconds = lingerSeconds;
|
||||
this.others = Collections.unmodifiableMap(new HashMap<String, String>(others));
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
private final Map<String, String> others = new HashMap<String, String>();
|
||||
private Integer timeoutMillis;
|
||||
private Integer lingerSeconds;
|
||||
|
||||
/** The value of {@link java.net.Socket#getSoTimeout()}. */
|
||||
public Builder setSocketOptionTimeoutMillis(Integer timeoutMillis) {
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** The value of {@link java.net.Socket#getSoLinger()}.
|
||||
* Note: SO_LINGER is typically expressed in seconds.
|
||||
*/
|
||||
public Builder setSocketOptionLingerSeconds(Integer lingerSeconds) {
|
||||
this.lingerSeconds = lingerSeconds;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addOption(String name, String value) {
|
||||
others.put(name, Preconditions.checkNotNull(value));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addOption(String name, int value) {
|
||||
others.put(name, Integer.toString(value));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addOption(String name, boolean value) {
|
||||
others.put(name, Boolean.toString(value));
|
||||
return this;
|
||||
}
|
||||
|
||||
public SocketOptions build() {
|
||||
return new SocketOptions(timeoutMillis, lingerSeconds, others);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A data class to represent transport stats.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ public class InProcessTransportTest extends AbstractTransportTest {
|
|||
@Test
|
||||
@Ignore
|
||||
@Override
|
||||
public void socketStats_addresses() throws Exception {
|
||||
public void socketStats() throws Exception {
|
||||
// test does not apply to in-process
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -323,6 +323,7 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
transportTracer.getStats(),
|
||||
channel.localAddress(),
|
||||
channel.remoteAddress(),
|
||||
Utils.getSocketOptions(channel),
|
||||
new Security()));
|
||||
return result;
|
||||
}
|
||||
|
|
@ -335,6 +336,7 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
transportTracer.getStats(),
|
||||
channel.localAddress(),
|
||||
channel.remoteAddress(),
|
||||
Utils.getSocketOptions(channel),
|
||||
new Security()));
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -206,6 +206,7 @@ class NettyServerTransport implements ServerTransport {
|
|||
transportTracer.getStats(),
|
||||
channel.localAddress(),
|
||||
channel.remoteAddress(),
|
||||
Utils.getSocketOptions(channel),
|
||||
/*security=*/ null));
|
||||
return result;
|
||||
}
|
||||
|
|
@ -218,6 +219,7 @@ class NettyServerTransport implements ServerTransport {
|
|||
transportTracer.getStats(),
|
||||
channel.localAddress(),
|
||||
channel.remoteAddress(),
|
||||
Utils.getSocketOptions(channel),
|
||||
/*security=*/ null));
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package io.grpc.netty;
|
|||
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
|
||||
import static io.grpc.internal.TransportFrameUtil.toHttp2Headers;
|
||||
import static io.grpc.internal.TransportFrameUtil.toRawSerializedHeaders;
|
||||
import static io.netty.channel.ChannelOption.SO_LINGER;
|
||||
import static io.netty.channel.ChannelOption.SO_TIMEOUT;
|
||||
import static io.netty.util.CharsetUtil.UTF_8;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
|
@ -26,9 +28,13 @@ import com.google.common.base.Preconditions;
|
|||
import io.grpc.InternalMetadata;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.Channelz;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.SharedResourceHolder.Resource;
|
||||
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
|
|
@ -38,6 +44,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
|
|||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
|
@ -201,6 +208,36 @@ class Utils {
|
|||
}
|
||||
}
|
||||
|
||||
static Channelz.SocketOptions getSocketOptions(Channel channel) {
|
||||
ChannelConfig config = channel.config();
|
||||
Channelz.SocketOptions.Builder b = new Channelz.SocketOptions.Builder();
|
||||
|
||||
// The API allows returning null but not sure if it can happen in practice.
|
||||
// Let's be paranoid and do null checking just in case.
|
||||
Integer lingerSeconds = config.getOption(SO_LINGER);
|
||||
if (lingerSeconds != null) {
|
||||
b.setSocketOptionLingerSeconds(lingerSeconds);
|
||||
}
|
||||
|
||||
Integer timeoutMillis = config.getOption(SO_TIMEOUT);
|
||||
if (timeoutMillis != null) {
|
||||
// in java, SO_TIMEOUT only applies to receiving
|
||||
b.setSocketOptionTimeoutMillis(timeoutMillis);
|
||||
}
|
||||
|
||||
for (Entry<ChannelOption<?>, Object> opt : config.getOptions().entrySet()) {
|
||||
ChannelOption<?> key = opt.getKey();
|
||||
// Constants are pooled, so there should only be one instance of each constant
|
||||
if (key.equals(SO_LINGER) || key.equals(SO_TIMEOUT)) {
|
||||
continue;
|
||||
}
|
||||
Object value = opt.getValue();
|
||||
// zpencer: Can a netty option be null?
|
||||
b.addOption(key.name(), String.valueOf(value));
|
||||
}
|
||||
return b.build();
|
||||
}
|
||||
|
||||
private Utils() {
|
||||
// Prevents instantiation
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,14 +17,23 @@
|
|||
package io.grpc.netty;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.truth.Truth;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.Channelz;
|
||||
import io.grpc.internal.Channelz.SocketOptions;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ConnectTimeoutException;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.channel.socket.oio.OioSocketChannel;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2Headers;
|
||||
import io.netty.handler.codec.http2.Http2Error;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
|
|
@ -118,6 +127,46 @@ public class UtilsTest {
|
|||
assertEquals(Utils.CONTENT_TYPE_GRPC, headers.get(GrpcUtil.CONTENT_TYPE_KEY.name()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelOptionsTest_noLinger() {
|
||||
Channel channel = new EmbeddedChannel();
|
||||
assertNull(channel.config().getOption(ChannelOption.SO_LINGER));
|
||||
Channelz.SocketOptions socketOptions = Utils.getSocketOptions(channel);
|
||||
assertNull(socketOptions.lingerSeconds);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelOptionsTest_oio() {
|
||||
Channel channel = new OioSocketChannel();
|
||||
SocketOptions socketOptions = setAndValidateGeneric(channel);
|
||||
assertEquals(250, (int) socketOptions.soTimeoutMillis);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelOptionsTest_nio() {
|
||||
Channel channel = new NioSocketChannel();
|
||||
SocketOptions socketOptions = setAndValidateGeneric(channel);
|
||||
assertNull(socketOptions.soTimeoutMillis);
|
||||
}
|
||||
|
||||
private static Channelz.SocketOptions setAndValidateGeneric(Channel channel) {
|
||||
channel.config().setOption(ChannelOption.SO_LINGER, 3);
|
||||
// only applicable for OIO channels:
|
||||
channel.config().setOption(ChannelOption.SO_TIMEOUT, 250);
|
||||
// Test some arbitrarily chosen options with a non numeric values
|
||||
channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
|
||||
WriteBufferWaterMark writeBufWaterMark = new WriteBufferWaterMark(10, 20);
|
||||
channel.config().setOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufWaterMark);
|
||||
|
||||
Channelz.SocketOptions socketOptions = Utils.getSocketOptions(channel);
|
||||
assertEquals(3, (int) socketOptions.lingerSeconds);
|
||||
assertEquals("true", socketOptions.others.get("SO_KEEPALIVE"));
|
||||
assertEquals(
|
||||
writeBufWaterMark.toString(),
|
||||
socketOptions.others.get(ChannelOption.WRITE_BUFFER_WATER_MARK.toString()));
|
||||
return socketOptions;
|
||||
}
|
||||
|
||||
private static void assertStatusEquals(Status expected, Status actual) {
|
||||
assertEquals(expected.getCode(), actual.getCode());
|
||||
Truth.assertThat(MoreObjects.firstNonNull(actual.getDescription(), ""))
|
||||
|
|
|
|||
|
|
@ -911,6 +911,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
|
|||
transportTracer.getStats(),
|
||||
socket.getLocalSocketAddress(),
|
||||
socket.getRemoteSocketAddress(),
|
||||
Utils.getSocketOptions(socket),
|
||||
new Security()));
|
||||
return ret;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,16 +19,23 @@ package io.grpc.okhttp;
|
|||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.InternalMetadata;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.internal.Channelz;
|
||||
import io.grpc.internal.TransportFrameUtil;
|
||||
import io.grpc.okhttp.internal.CipherSuite;
|
||||
import io.grpc.okhttp.internal.ConnectionSpec;
|
||||
import io.grpc.okhttp.internal.framed.Header;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.util.List;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* Common utility methods for OkHttp transport.
|
||||
*/
|
||||
class Utils {
|
||||
private static final Logger log = Logger.getLogger(Utils.class.getName());
|
||||
|
||||
static final int DEFAULT_WINDOW_SIZE = 65535;
|
||||
static final int CONNECTION_STREAM_ID = 0;
|
||||
|
||||
|
|
@ -79,6 +86,78 @@ class Utils {
|
|||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to capture all known socket options and return the results as a
|
||||
* {@link Channelz.SocketOptions}. If getting a socket option threw an exception,
|
||||
* log the error to the logger and report the value as an error in the response.
|
||||
*/
|
||||
static Channelz.SocketOptions getSocketOptions(Socket socket) {
|
||||
Channelz.SocketOptions.Builder builder = new Channelz.SocketOptions.Builder();
|
||||
try {
|
||||
builder.setSocketOptionLingerSeconds(socket.getSoLinger());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_LINGER", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.setSocketOptionTimeoutMillis(socket.getSoTimeout());
|
||||
} catch (Exception e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_TIMEOUT", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("TCP_NODELAY", socket.getTcpNoDelay());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("TCP_NODELAY", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("SO_REUSEADDR", socket.getReuseAddress());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_REUSEADDR", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("SO_SNDBUF", socket.getSendBufferSize());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_SNDBUF", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("SO_RECVBUF", socket.getReceiveBufferSize());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_RECVBUF", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("SO_KEEPALIVE", socket.getKeepAlive());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_KEEPALIVE", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("SO_OOBINLINE", socket.getOOBInline());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("SO_OOBINLINE", "channelz_internal_error");
|
||||
}
|
||||
|
||||
try {
|
||||
builder.addOption("IP_TOS", socket.getTrafficClass());
|
||||
} catch (SocketException e) {
|
||||
log.log(Level.SEVERE, "Exception caught while reading socket option", e);
|
||||
builder.addOption("IP_TOS", "channelz_internal_error");
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private Utils() {
|
||||
// Prevents instantiation
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,9 +19,11 @@ package io.grpc.okhttp;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import io.grpc.internal.Channelz.SocketOptions;
|
||||
import io.grpc.okhttp.internal.CipherSuite;
|
||||
import io.grpc.okhttp.internal.ConnectionSpec;
|
||||
import io.grpc.okhttp.internal.TlsVersion;
|
||||
import java.net.Socket;
|
||||
import java.util.List;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
|
@ -69,4 +71,29 @@ public class UtilsTest {
|
|||
assertEquals(CipherSuite.forJavaName(squareCipherSuites.get(i).name()), cipherSuites.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSocketOptions() throws Exception {
|
||||
Socket socket = new Socket();
|
||||
socket.setSoLinger(true, 2);
|
||||
socket.setSoTimeout(3);
|
||||
socket.setTcpNoDelay(true);
|
||||
socket.setReuseAddress(true);
|
||||
socket.setReceiveBufferSize(4000);
|
||||
socket.setSendBufferSize(5000);
|
||||
socket.setKeepAlive(true);
|
||||
socket.setOOBInline(true);
|
||||
socket.setTrafficClass(8); // note: see javadoc for valid input values
|
||||
|
||||
SocketOptions socketOptions = Utils.getSocketOptions(socket);
|
||||
assertEquals(2, (int) socketOptions.lingerSeconds);
|
||||
assertEquals(3, (int) socketOptions.soTimeoutMillis);
|
||||
assertEquals("true", socketOptions.others.get("TCP_NODELAY"));
|
||||
assertEquals("true", socketOptions.others.get("SO_REUSEADDR"));
|
||||
assertEquals("4000", socketOptions.others.get("SO_RECVBUF"));
|
||||
assertEquals("5000", socketOptions.others.get("SO_SNDBUF"));
|
||||
assertEquals("true", socketOptions.others.get("SO_KEEPALIVE"));
|
||||
assertEquals("true", socketOptions.others.get("SO_OOBINLINE"));
|
||||
assertEquals("8", socketOptions.others.get("IP_TOS"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,8 +18,10 @@ package io.grpc.services;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Int64Value;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import com.google.protobuf.util.Timestamps;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.Status;
|
||||
|
|
@ -39,6 +41,9 @@ import io.grpc.channelz.v1.ServerData;
|
|||
import io.grpc.channelz.v1.ServerRef;
|
||||
import io.grpc.channelz.v1.Socket;
|
||||
import io.grpc.channelz.v1.SocketData;
|
||||
import io.grpc.channelz.v1.SocketOption;
|
||||
import io.grpc.channelz.v1.SocketOptionLinger;
|
||||
import io.grpc.channelz.v1.SocketOptionTimeout;
|
||||
import io.grpc.channelz.v1.SocketRef;
|
||||
import io.grpc.channelz.v1.Subchannel;
|
||||
import io.grpc.channelz.v1.SubchannelRef;
|
||||
|
|
@ -54,6 +59,9 @@ import io.grpc.internal.Instrumented;
|
|||
import io.grpc.internal.WithLogId;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
|
|
@ -121,7 +129,7 @@ final class ChannelzProtoUtil {
|
|||
.setRef(toSocketRef(obj))
|
||||
.setRemote(toAddress(socketStats.remote))
|
||||
.setLocal(toAddress(socketStats.local))
|
||||
.setData(toSocketData(socketStats.data))
|
||||
.setData(extractSocketData(socketStats))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
@ -147,7 +155,8 @@ final class ChannelzProtoUtil {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
static SocketData toSocketData(TransportStats s) {
|
||||
static SocketData extractSocketData(SocketStats socketStats) {
|
||||
TransportStats s = socketStats.data;
|
||||
return SocketData
|
||||
.newBuilder()
|
||||
.setStreamsStarted(s.streamsStarted)
|
||||
|
|
@ -168,9 +177,66 @@ final class ChannelzProtoUtil {
|
|||
Int64Value.newBuilder().setValue(s.localFlowControlWindow).build())
|
||||
.setRemoteFlowControlWindow(
|
||||
Int64Value.newBuilder().setValue(s.remoteFlowControlWindow).build())
|
||||
.addAllOption(toSocketOptionsList(socketStats.socketOptions))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static final String SO_LINGER = "SO_LINGER";
|
||||
public static final String SO_TIMEOUT = "SO_TIMEOUT";
|
||||
|
||||
static SocketOption toSocketOptionLinger(int lingerSeconds) {
|
||||
final SocketOptionLinger lingerOpt;
|
||||
if (lingerSeconds >= 0) {
|
||||
lingerOpt = SocketOptionLinger
|
||||
.newBuilder()
|
||||
.setActive(true)
|
||||
.setDuration(Durations.fromSeconds(lingerSeconds))
|
||||
.build();
|
||||
} else {
|
||||
lingerOpt = SocketOptionLinger.getDefaultInstance();
|
||||
}
|
||||
return SocketOption
|
||||
.newBuilder()
|
||||
.setName(SO_LINGER)
|
||||
.setAdditional(Any.pack(lingerOpt))
|
||||
.build();
|
||||
}
|
||||
|
||||
static SocketOption toSocketOptionTimeout(String name, int timeoutMillis) {
|
||||
Preconditions.checkNotNull(name);
|
||||
return SocketOption
|
||||
.newBuilder()
|
||||
.setName(name)
|
||||
.setAdditional(
|
||||
Any.pack(
|
||||
SocketOptionTimeout
|
||||
.newBuilder()
|
||||
.setDuration(Durations.fromMillis(timeoutMillis))
|
||||
.build()))
|
||||
.build();
|
||||
}
|
||||
|
||||
static SocketOption toSocketOptionAdditional(String name, String value) {
|
||||
Preconditions.checkNotNull(name);
|
||||
Preconditions.checkNotNull(value);
|
||||
return SocketOption.newBuilder().setName(name).setValue(value).build();
|
||||
}
|
||||
|
||||
static List<SocketOption> toSocketOptionsList(Channelz.SocketOptions options) {
|
||||
Preconditions.checkNotNull(options);
|
||||
List<SocketOption> ret = new ArrayList<SocketOption>();
|
||||
if (options.lingerSeconds != null) {
|
||||
ret.add(toSocketOptionLinger(options.lingerSeconds));
|
||||
}
|
||||
if (options.soTimeoutMillis != null) {
|
||||
ret.add(toSocketOptionTimeout(SO_TIMEOUT, options.soTimeoutMillis));
|
||||
}
|
||||
for (Entry<String, String> entry : options.others.entrySet()) {
|
||||
ret.add(toSocketOptionAdditional(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static Channel toChannel(Instrumented<ChannelStats> channel) {
|
||||
ChannelStats stats = getFuture(channel.getStats());
|
||||
Channel.Builder channelBuilder = Channel
|
||||
|
|
|
|||
|
|
@ -16,11 +16,14 @@
|
|||
|
||||
package io.grpc.services;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Int64Value;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import com.google.protobuf.util.Timestamps;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.channelz.v1.Address;
|
||||
|
|
@ -39,14 +42,19 @@ import io.grpc.channelz.v1.ServerData;
|
|||
import io.grpc.channelz.v1.ServerRef;
|
||||
import io.grpc.channelz.v1.Socket;
|
||||
import io.grpc.channelz.v1.SocketData;
|
||||
import io.grpc.channelz.v1.SocketOption;
|
||||
import io.grpc.channelz.v1.SocketOptionLinger;
|
||||
import io.grpc.channelz.v1.SocketOptionTimeout;
|
||||
import io.grpc.channelz.v1.SocketRef;
|
||||
import io.grpc.channelz.v1.Subchannel;
|
||||
import io.grpc.channelz.v1.SubchannelRef;
|
||||
import io.grpc.internal.Channelz;
|
||||
import io.grpc.internal.Channelz.ChannelStats;
|
||||
import io.grpc.internal.Channelz.RootChannelList;
|
||||
import io.grpc.internal.Channelz.ServerList;
|
||||
import io.grpc.internal.Channelz.ServerSocketsList;
|
||||
import io.grpc.internal.Channelz.ServerStats;
|
||||
import io.grpc.internal.Channelz.SocketOptions;
|
||||
import io.grpc.internal.Instrumented;
|
||||
import io.grpc.internal.WithLogId;
|
||||
import io.grpc.services.ChannelzTestHelper.TestChannel;
|
||||
|
|
@ -57,6 +65,7 @@ import java.net.Inet4Address;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.Map.Entry;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
|
@ -125,13 +134,47 @@ public final class ChannelzProtoUtilTest {
|
|||
.setData(serverData)
|
||||
.build();
|
||||
|
||||
private final SocketOption sockOptLingerDisabled = SocketOption
|
||||
.newBuilder()
|
||||
.setName("SO_LINGER")
|
||||
.setAdditional(
|
||||
Any.pack(SocketOptionLinger.getDefaultInstance()))
|
||||
.build();
|
||||
|
||||
private final SocketOption sockOptlinger10s = SocketOption
|
||||
.newBuilder()
|
||||
.setName("SO_LINGER")
|
||||
.setAdditional(
|
||||
Any.pack(SocketOptionLinger
|
||||
.newBuilder()
|
||||
.setActive(true)
|
||||
.setDuration(Durations.fromSeconds(10))
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
private final SocketOption sockOptTimeout200ms = SocketOption
|
||||
.newBuilder()
|
||||
.setName("SO_TIMEOUT")
|
||||
.setAdditional(
|
||||
Any.pack(SocketOptionTimeout
|
||||
.newBuilder()
|
||||
.setDuration(Durations.fromMillis(200))
|
||||
.build())
|
||||
).build();
|
||||
|
||||
private final SocketOption sockOptAdditional = SocketOption
|
||||
.newBuilder()
|
||||
.setName("SO_MADE_UP_OPTION")
|
||||
.setValue("some-made-up-value")
|
||||
.build();
|
||||
|
||||
private final TestSocket socket = new TestSocket();
|
||||
private final SocketRef socketRef = SocketRef
|
||||
.newBuilder()
|
||||
.setName(socket.toString())
|
||||
.setSocketId(socket.getLogId().getId())
|
||||
.build();
|
||||
private final SocketData socketData = SocketData
|
||||
private final SocketData socketDataNoSockOpts = SocketData
|
||||
.newBuilder()
|
||||
.setStreamsStarted(1)
|
||||
.setLastLocalStreamCreatedTimestamp(Timestamps.fromNanos(2))
|
||||
|
|
@ -203,16 +246,28 @@ public final class ChannelzProtoUtilTest {
|
|||
.setRef(socketRef)
|
||||
.setLocal(localAddress)
|
||||
.setRemote(remoteAddress)
|
||||
.setData(socketData)
|
||||
.setData(socketDataNoSockOpts)
|
||||
.build(),
|
||||
ChannelzProtoUtil.toSocket(socket));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toSocketData() {
|
||||
public void extractSocketData() throws Exception {
|
||||
// no options
|
||||
assertEquals(
|
||||
socketData,
|
||||
ChannelzProtoUtil.toSocketData(socket.transportStats));
|
||||
socketDataNoSockOpts,
|
||||
ChannelzProtoUtil.extractSocketData(socket.getStats().get()));
|
||||
|
||||
// with options
|
||||
socket.socketOptions = toBuilder(socket.socketOptions)
|
||||
.setSocketOptionLingerSeconds(10)
|
||||
.build();
|
||||
assertEquals(
|
||||
socketDataNoSockOpts
|
||||
.toBuilder()
|
||||
.addOption(sockOptlinger10s)
|
||||
.build(),
|
||||
ChannelzProtoUtil.extractSocketData(socket.getStats().get()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -482,6 +537,64 @@ public final class ChannelzProtoUtilTest {
|
|||
new ServerSocketsList(ImmutableList.<WithLogId>of(socket, socket2), true)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toSocketOptionLinger() {
|
||||
assertEquals(sockOptLingerDisabled, ChannelzProtoUtil.toSocketOptionLinger(-1));
|
||||
assertEquals(sockOptlinger10s, ChannelzProtoUtil.toSocketOptionLinger(10));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toSocketOptionTimeout() {
|
||||
assertEquals(
|
||||
sockOptTimeout200ms, ChannelzProtoUtil.toSocketOptionTimeout("SO_TIMEOUT", 200));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toSocketOptionAdditional() {
|
||||
assertEquals(
|
||||
sockOptAdditional,
|
||||
ChannelzProtoUtil.toSocketOptionAdditional("SO_MADE_UP_OPTION", "some-made-up-value"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toSocketOptionsList() {
|
||||
assertThat(
|
||||
ChannelzProtoUtil.toSocketOptionsList(
|
||||
new Channelz.SocketOptions.Builder().build()))
|
||||
.isEmpty();
|
||||
|
||||
assertThat(
|
||||
ChannelzProtoUtil.toSocketOptionsList(
|
||||
new Channelz.SocketOptions.Builder().setSocketOptionLingerSeconds(10).build()))
|
||||
.containsExactly(sockOptlinger10s);
|
||||
|
||||
assertThat(
|
||||
ChannelzProtoUtil.toSocketOptionsList(
|
||||
new Channelz.SocketOptions.Builder().setSocketOptionTimeoutMillis(200).build()))
|
||||
.containsExactly(sockOptTimeout200ms);
|
||||
|
||||
assertThat(
|
||||
ChannelzProtoUtil.toSocketOptionsList(
|
||||
new Channelz.SocketOptions
|
||||
.Builder()
|
||||
.addOption("SO_MADE_UP_OPTION", "some-made-up-value")
|
||||
.build()))
|
||||
.containsExactly(sockOptAdditional);
|
||||
|
||||
SocketOption otherOption = SocketOption
|
||||
.newBuilder()
|
||||
.setName("SO_MADE_UP_OPTION2")
|
||||
.setValue("some-made-up-value2")
|
||||
.build();
|
||||
assertThat(
|
||||
ChannelzProtoUtil.toSocketOptionsList(
|
||||
new Channelz.SocketOptions.Builder()
|
||||
.addOption("SO_MADE_UP_OPTION", "some-made-up-value")
|
||||
.addOption("SO_MADE_UP_OPTION2", "some-made-up-value2")
|
||||
.build()))
|
||||
.containsExactly(sockOptAdditional, otherOption);
|
||||
}
|
||||
|
||||
private static ChannelStats.Builder toBuilder(ChannelStats stats) {
|
||||
ChannelStats.Builder builder = new ChannelStats.Builder()
|
||||
.setTarget(stats.target)
|
||||
|
|
@ -498,4 +611,14 @@ public final class ChannelzProtoUtilTest {
|
|||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
private static SocketOptions.Builder toBuilder(SocketOptions options) {
|
||||
SocketOptions.Builder builder = new SocketOptions.Builder()
|
||||
.setSocketOptionTimeoutMillis(options.soTimeoutMillis)
|
||||
.setSocketOptionLingerSeconds(options.lingerSeconds);
|
||||
for (Entry<String, String> entry : options.others.entrySet()) {
|
||||
builder.addOption(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package io.grpc.services;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.internal.Channelz;
|
||||
import io.grpc.internal.Channelz.ChannelStats;
|
||||
import io.grpc.internal.Channelz.Security;
|
||||
import io.grpc.internal.Channelz.ServerStats;
|
||||
|
|
@ -54,12 +55,18 @@ final class ChannelzTestHelper {
|
|||
/*remoteFlowControlWindow=*/ 12);
|
||||
SocketAddress local = new InetSocketAddress("10.0.0.1", 1000);
|
||||
SocketAddress remote = new InetSocketAddress("10.0.0.2", 1000);
|
||||
|
||||
Channelz.SocketOptions socketOptions = new Channelz.SocketOptions.Builder().build();
|
||||
|
||||
@Override
|
||||
public ListenableFuture<SocketStats> getStats() {
|
||||
SettableFuture<SocketStats> ret = SettableFuture.create();
|
||||
ret.set(new SocketStats(transportStats, local, remote, new Security()));
|
||||
ret.set(
|
||||
new SocketStats(
|
||||
transportStats,
|
||||
local,
|
||||
remote,
|
||||
socketOptions,
|
||||
new Security()));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -80,7 +80,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
|
@ -1693,9 +1692,8 @@ public abstract class AbstractTransportTest {
|
|||
serverStream.close(Status.OK, new Metadata());
|
||||
}
|
||||
|
||||
@Ignore("hardcoding 127.0.0.1 fails in ipv6")
|
||||
@Test
|
||||
public void socketStats_addresses() throws Exception {
|
||||
public void socketStats() throws Exception {
|
||||
server.start(serverListener);
|
||||
ManagedClientTransport client = newClientTransport(server);
|
||||
startTransport(client, mockClientTransportListener);
|
||||
|
|
@ -1715,10 +1713,16 @@ public abstract class AbstractTransportTest {
|
|||
SocketStats clientSocketStats = client.getStats().get();
|
||||
assertEquals(clientAddress, clientSocketStats.local);
|
||||
assertEquals(serverAddress, clientSocketStats.remote);
|
||||
// very basic sanity check that socket options are populated
|
||||
assertNotNull(clientSocketStats.socketOptions.lingerSeconds);
|
||||
assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
|
||||
|
||||
SocketStats serverSocketStats = serverTransportListener.transport.getStats().get();
|
||||
assertEquals(serverAddress, serverSocketStats.local);
|
||||
assertEquals(clientAddress, serverSocketStats.remote);
|
||||
// very basic sanity check that socket options are populated
|
||||
assertNotNull(serverSocketStats.socketOptions.lingerSeconds);
|
||||
assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Reference in New Issue