mirror of https://github.com/grpc/grpc-java.git
benchmarks: Modernize client to use target and credentials
This allows using LoadClient with xDS. The changes to SocketAddressValidator are a bit hacky, but we really don't care about cleanliness there. Eventually on client-side, we should be deleting the unix special case entirely, as we'll have a unix name resolver. Fixes #8877
This commit is contained in:
parent
f0a7132fbe
commit
7a9ceacafc
|
|
@ -27,6 +27,7 @@ dependencies {
|
||||||
project(':grpc-stub'),
|
project(':grpc-stub'),
|
||||||
project(':grpc-protobuf'),
|
project(':grpc-protobuf'),
|
||||||
project(':grpc-testing'),
|
project(':grpc-testing'),
|
||||||
|
project(path: ':grpc-xds', configuration: 'shadow'),
|
||||||
libraries.hdrhistogram,
|
libraries.hdrhistogram,
|
||||||
libraries.netty_tcnative,
|
libraries.netty_tcnative,
|
||||||
libraries.netty_epoll,
|
libraries.netty_epoll,
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,11 @@ public interface SocketAddressValidator {
|
||||||
public boolean isValidSocketAddress(SocketAddress address) {
|
public boolean isValidSocketAddress(SocketAddress address) {
|
||||||
return address instanceof InetSocketAddress;
|
return address instanceof InetSocketAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isValidSocketAddress(String address) {
|
||||||
|
return !address.startsWith("unix://");
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -43,10 +48,20 @@ public interface SocketAddressValidator {
|
||||||
public boolean isValidSocketAddress(SocketAddress address) {
|
public boolean isValidSocketAddress(SocketAddress address) {
|
||||||
return "DomainSocketAddress".equals(address.getClass().getSimpleName());
|
return "DomainSocketAddress".equals(address.getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isValidSocketAddress(String address) {
|
||||||
|
return address.startsWith("unix://");
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns {@code true} if the given address is valid.
|
* Returns {@code true} if the given address is valid.
|
||||||
*/
|
*/
|
||||||
boolean isValidSocketAddress(SocketAddress address);
|
boolean isValidSocketAddress(SocketAddress address);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if the given address is valid.
|
||||||
|
*/
|
||||||
|
boolean isValidSocketAddress(String address);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,6 @@
|
||||||
|
|
||||||
package io.grpc.benchmarks;
|
package io.grpc.benchmarks;
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All of the supported transports.
|
* All of the supported transports.
|
||||||
*/
|
*/
|
||||||
|
|
@ -49,7 +47,7 @@ public enum Transport {
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if the given address is invalid for this transport.
|
* @throws IllegalArgumentException if the given address is invalid for this transport.
|
||||||
*/
|
*/
|
||||||
public void validateSocketAddress(SocketAddress address) {
|
public void validateSocketAddress(String address) {
|
||||||
if (!socketAddressValidator.isValidSocketAddress(address)) {
|
if (!socketAddressValidator.isValidSocketAddress(address)) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Invalid address " + address + " for transport " + this);
|
"Invalid address " + address + " for transport " + this);
|
||||||
|
|
|
||||||
|
|
@ -18,20 +18,22 @@ package io.grpc.benchmarks;
|
||||||
|
|
||||||
import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
|
import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
|
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import io.grpc.ChannelCredentials;
|
||||||
|
import io.grpc.InsecureChannelCredentials;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.ManagedChannelBuilder;
|
import io.grpc.ManagedChannelBuilder;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.TlsChannelCredentials;
|
||||||
import io.grpc.benchmarks.proto.Messages;
|
import io.grpc.benchmarks.proto.Messages;
|
||||||
import io.grpc.benchmarks.proto.Messages.Payload;
|
import io.grpc.benchmarks.proto.Messages.Payload;
|
||||||
import io.grpc.benchmarks.proto.Messages.SimpleRequest;
|
import io.grpc.benchmarks.proto.Messages.SimpleRequest;
|
||||||
import io.grpc.benchmarks.proto.Messages.SimpleResponse;
|
import io.grpc.benchmarks.proto.Messages.SimpleResponse;
|
||||||
import io.grpc.internal.testing.TestUtils;
|
|
||||||
import io.grpc.netty.GrpcSslContexts;
|
|
||||||
import io.grpc.netty.NettyChannelBuilder;
|
import io.grpc.netty.NettyChannelBuilder;
|
||||||
import io.grpc.okhttp.OkHttpChannelBuilder;
|
import io.grpc.okhttp.OkHttpChannelBuilder;
|
||||||
import io.grpc.okhttp.internal.Platform;
|
import io.grpc.testing.TlsTesting;
|
||||||
import io.netty.channel.epoll.EpollDomainSocketChannel;
|
import io.netty.channel.epoll.EpollDomainSocketChannel;
|
||||||
import io.netty.channel.epoll.EpollEventLoopGroup;
|
import io.netty.channel.epoll.EpollEventLoopGroup;
|
||||||
import io.netty.channel.epoll.EpollSocketChannel;
|
import io.netty.channel.epoll.EpollSocketChannel;
|
||||||
|
|
@ -79,15 +81,10 @@ public final class Utils {
|
||||||
/**
|
/**
|
||||||
* Parse a {@link SocketAddress} from the given string.
|
* Parse a {@link SocketAddress} from the given string.
|
||||||
*/
|
*/
|
||||||
public static SocketAddress parseSocketAddress(String value) {
|
public static SocketAddress parseServerSocketAddress(String value) {
|
||||||
if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
|
if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
|
||||||
// Unix Domain Socket address.
|
DomainSocketAddress domainAddress = parseUnixSocketAddress(value);
|
||||||
// Create the underlying file for the Unix Domain Socket.
|
File file = new File(domainAddress.path());
|
||||||
String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
|
|
||||||
File file = new File(filePath);
|
|
||||||
if (!file.isAbsolute()) {
|
|
||||||
throw new IllegalArgumentException("File path must be absolute: " + filePath);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
if (file.createNewFile()) {
|
if (file.createNewFile()) {
|
||||||
// If this application created the file, delete it when the application exits.
|
// If this application created the file, delete it when the application exits.
|
||||||
|
|
@ -96,8 +93,7 @@ public final class Utils {
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
// Create the SocketAddress referencing the file.
|
return domainAddress;
|
||||||
return new DomainSocketAddress(file);
|
|
||||||
} else {
|
} else {
|
||||||
// Standard TCP/IP address.
|
// Standard TCP/IP address.
|
||||||
String[] parts = value.split(":", 2);
|
String[] parts = value.split(":", 2);
|
||||||
|
|
@ -111,37 +107,24 @@ public final class Utils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static OkHttpChannelBuilder newOkHttpClientChannel(
|
private static DomainSocketAddress parseUnixSocketAddress(String value) {
|
||||||
SocketAddress address, boolean tls, boolean testca) {
|
Preconditions.checkArgument(
|
||||||
InetSocketAddress addr = (InetSocketAddress) address;
|
value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX),
|
||||||
OkHttpChannelBuilder builder =
|
"Must start with %s: %s", UNIX_DOMAIN_SOCKET_PREFIX, value);
|
||||||
OkHttpChannelBuilder.forAddress(addr.getHostName(), addr.getPort());
|
// Unix Domain Socket address.
|
||||||
if (!tls) {
|
// Create the underlying file for the Unix Domain Socket.
|
||||||
builder.usePlaintext();
|
String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
|
||||||
} else if (testca) {
|
File file = new File(filePath);
|
||||||
try {
|
if (!file.isAbsolute()) {
|
||||||
builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(
|
throw new IllegalArgumentException("File path must be absolute: " + filePath);
|
||||||
Platform.get().getProvider(),
|
|
||||||
TestUtils.loadCert("ca.pem")));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
}
|
// Create the SocketAddress referencing the file.
|
||||||
return builder;
|
return new DomainSocketAddress(file);
|
||||||
}
|
|
||||||
|
|
||||||
private static NettyChannelBuilder newNettyClientChannel(Transport transport,
|
|
||||||
SocketAddress address, boolean tls, boolean testca, int flowControlWindow)
|
|
||||||
throws IOException {
|
|
||||||
NettyChannelBuilder builder =
|
|
||||||
NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow);
|
|
||||||
if (!tls) {
|
|
||||||
builder.usePlaintext();
|
|
||||||
} else if (testca) {
|
|
||||||
File cert = TestUtils.loadCert("ca.pem");
|
|
||||||
builder.sslContext(GrpcSslContexts.forClient().trustManager(cert).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static NettyChannelBuilder configureNetty(
|
||||||
|
NettyChannelBuilder builder, Transport transport, int flowControlWindow) {
|
||||||
|
builder.flowControlWindow(flowControlWindow);
|
||||||
DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
|
DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
|
||||||
switch (transport) {
|
switch (transport) {
|
||||||
case NETTY_NIO:
|
case NETTY_NIO:
|
||||||
|
|
@ -194,17 +177,38 @@ public final class Utils {
|
||||||
/**
|
/**
|
||||||
* Create a {@link ManagedChannel} for the given parameters.
|
* Create a {@link ManagedChannel} for the given parameters.
|
||||||
*/
|
*/
|
||||||
public static ManagedChannel newClientChannel(Transport transport, SocketAddress address,
|
public static ManagedChannel newClientChannel(Transport transport, String target,
|
||||||
boolean tls, boolean testca, @Nullable String authorityOverride,
|
boolean tls, boolean testca, @Nullable String authorityOverride,
|
||||||
int flowControlWindow, boolean directExecutor) {
|
int flowControlWindow, boolean directExecutor) {
|
||||||
|
ChannelCredentials credentials;
|
||||||
|
if (tls) {
|
||||||
|
if (testca) {
|
||||||
|
try {
|
||||||
|
credentials = TlsChannelCredentials.newBuilder()
|
||||||
|
.trustManager(TlsTesting.loadCert("ca.pem"))
|
||||||
|
.build();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
credentials = TlsChannelCredentials.create();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
credentials = InsecureChannelCredentials.create();
|
||||||
|
}
|
||||||
ManagedChannelBuilder<?> builder;
|
ManagedChannelBuilder<?> builder;
|
||||||
if (transport == Transport.OK_HTTP) {
|
if (transport == Transport.OK_HTTP) {
|
||||||
builder = newOkHttpClientChannel(address, tls, testca);
|
builder = OkHttpChannelBuilder.forTarget(target, credentials)
|
||||||
|
.flowControlWindow(flowControlWindow);
|
||||||
} else {
|
} else {
|
||||||
try {
|
if (target.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
|
||||||
builder = newNettyClientChannel(transport, address, tls, testca, flowControlWindow);
|
builder = configureNetty(
|
||||||
} catch (Exception e) {
|
NettyChannelBuilder.forAddress(parseUnixSocketAddress(target), credentials),
|
||||||
throw new RuntimeException(e);
|
transport, flowControlWindow);
|
||||||
|
} else {
|
||||||
|
builder = configureNetty(
|
||||||
|
NettyChannelBuilder.forTarget(target, credentials),
|
||||||
|
transport, flowControlWindow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (authorityOverride != null) {
|
if (authorityOverride != null) {
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,7 @@ class LoadClient {
|
||||||
channels[i] =
|
channels[i] =
|
||||||
Utils.newClientChannel(
|
Utils.newClientChannel(
|
||||||
Epoll.isAvailable() ? Transport.NETTY_EPOLL : Transport.NETTY_NIO,
|
Epoll.isAvailable() ? Transport.NETTY_EPOLL : Transport.NETTY_NIO,
|
||||||
Utils.parseSocketAddress(config.getServerTargets(i % config.getServerTargetsCount())),
|
config.getServerTargets(i % config.getServerTargetsCount()),
|
||||||
config.hasSecurityParams(),
|
config.hasSecurityParams(),
|
||||||
config.hasSecurityParams() && config.getSecurityParams().getUseTestCa(),
|
config.hasSecurityParams() && config.getSecurityParams().getUseTestCa(),
|
||||||
config.hasSecurityParams()
|
config.hasSecurityParams()
|
||||||
|
|
|
||||||
|
|
@ -28,8 +28,6 @@ import io.grpc.benchmarks.proto.Messages;
|
||||||
import io.grpc.benchmarks.proto.Messages.PayloadType;
|
import io.grpc.benchmarks.proto.Messages.PayloadType;
|
||||||
import io.grpc.internal.testing.TestUtils;
|
import io.grpc.internal.testing.TestUtils;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
|
|
@ -47,7 +45,7 @@ public class ClientConfiguration implements Configuration {
|
||||||
String authorityOverride = TestUtils.TEST_SERVER_HOST;
|
String authorityOverride = TestUtils.TEST_SERVER_HOST;
|
||||||
boolean useDefaultCiphers;
|
boolean useDefaultCiphers;
|
||||||
boolean directExecutor;
|
boolean directExecutor;
|
||||||
SocketAddress address;
|
String target;
|
||||||
int channels = 4;
|
int channels = 4;
|
||||||
int outstandingRpcsPerChannel = 10;
|
int outstandingRpcsPerChannel = 10;
|
||||||
int serverPayload;
|
int serverPayload;
|
||||||
|
|
@ -66,7 +64,7 @@ public class ClientConfiguration implements Configuration {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ManagedChannel newChannel() throws IOException {
|
public ManagedChannel newChannel() throws IOException {
|
||||||
return Utils.newClientChannel(transport, address, tls, testca, authorityOverride,
|
return Utils.newClientChannel(transport, target, tls, testca, authorityOverride,
|
||||||
flowControlWindow, directExecutor);
|
flowControlWindow, directExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -106,18 +104,10 @@ public class ClientConfiguration implements Configuration {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Transport " + config.transport.name().toLowerCase() + " does not support TLS.");
|
"Transport " + config.transport.name().toLowerCase() + " does not support TLS.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.transport != Transport.OK_HTTP
|
|
||||||
&& config.testca && config.address instanceof InetSocketAddress) {
|
|
||||||
// Override the socket address with the host from the testca.
|
|
||||||
InetSocketAddress address = (InetSocketAddress) config.address;
|
|
||||||
config.address = TestUtils.testServerAddress(address.getHostName(),
|
|
||||||
address.getPort());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the address type is correct for the transport type.
|
// Verify that the address type is correct for the transport type.
|
||||||
config.transport.validateSocketAddress(config.address);
|
config.transport.validateSocketAddress(config.target);
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
@ -136,7 +126,7 @@ public class ClientConfiguration implements Configuration {
|
||||||
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
|
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
|
||||||
@Override
|
@Override
|
||||||
protected void setClientValue(ClientConfiguration config, String value) {
|
protected void setClientValue(ClientConfiguration config, String value) {
|
||||||
config.address = Utils.parseSocketAddress(value);
|
config.target = value;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
CHANNELS("INT", "Number of Channels.", "" + DEFAULT.channels) {
|
CHANNELS("INT", "Number of Channels.", "" + DEFAULT.channels) {
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ class ServerConfiguration implements Configuration {
|
||||||
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
|
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
|
||||||
@Override
|
@Override
|
||||||
protected void setServerValue(ServerConfiguration config, String value) {
|
protected void setServerValue(ServerConfiguration config, String value) {
|
||||||
SocketAddress address = Utils.parseSocketAddress(value);
|
SocketAddress address = Utils.parseServerSocketAddress(value);
|
||||||
if (address instanceof InetSocketAddress) {
|
if (address instanceof InetSocketAddress) {
|
||||||
InetSocketAddress addr = (InetSocketAddress) address;
|
InetSocketAddress addr = (InetSocketAddress) address;
|
||||||
int port = addr.getPort() == 0 ? Utils.pickUnusedPort() : addr.getPort();
|
int port = addr.getPort() == 0 ? Utils.pickUnusedPort() : addr.getPort();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue