interop-testing: Add XdsTestClient and XdsTestServer (#6585)

This commit is contained in:
Eric Gribkoff 2020-01-13 23:57:09 -08:00 committed by GitHub
parent 18e099d9d3
commit d914e011b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 832 additions and 0 deletions

View File

@ -25,6 +25,7 @@ dependencies {
project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-testing'),
project(':grpc-xds'),
libraries.google_auth_oauth2_http,
libraries.junit,
libraries.truth
@ -112,6 +113,20 @@ task grpclb_long_lived_affinity_test_client(type: CreateStartScripts) {
]
}
task xds_test_client(type: CreateStartScripts) {
mainClassName = "io.grpc.testing.integration.XdsTestClient"
applicationName = "xds-test-client"
outputDir = new File(project.buildDir, 'tmp')
classpath = jar.outputs.files + configurations.runtime
}
task xds_test_server(type: CreateStartScripts) {
mainClassName = "io.grpc.testing.integration.XdsTestServer"
applicationName = "xds-test-server"
outputDir = new File(project.buildDir, 'tmp')
classpath = jar.outputs.files + configurations.runtime
}
applicationDistribution.into("bin") {
from(test_client)
from(test_server)
@ -119,6 +134,8 @@ applicationDistribution.into("bin") {
from(stresstest_client)
from(http2_client)
from(grpclb_long_lived_affinity_test_client)
from(xds_test_client)
from(xds_test_server)
fileMode = 0755
}

View File

@ -0,0 +1,315 @@
package io.grpc.testing.integration;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.futureUnaryCall;
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
/**
* <pre>
* A service used to obtain stats for verifying LB behavior.
* </pre>
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler",
comments = "Source: grpc/testing/test.proto")
public final class LoadBalancerStatsServiceGrpc {
private LoadBalancerStatsServiceGrpc() {}
public static final String SERVICE_NAME = "grpc.testing.LoadBalancerStatsService";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<io.grpc.testing.integration.Messages.LoadBalancerStatsRequest,
io.grpc.testing.integration.Messages.LoadBalancerStatsResponse> getGetClientStatsMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "GetClientStats",
requestType = io.grpc.testing.integration.Messages.LoadBalancerStatsRequest.class,
responseType = io.grpc.testing.integration.Messages.LoadBalancerStatsResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<io.grpc.testing.integration.Messages.LoadBalancerStatsRequest,
io.grpc.testing.integration.Messages.LoadBalancerStatsResponse> getGetClientStatsMethod() {
io.grpc.MethodDescriptor<io.grpc.testing.integration.Messages.LoadBalancerStatsRequest, io.grpc.testing.integration.Messages.LoadBalancerStatsResponse> getGetClientStatsMethod;
if ((getGetClientStatsMethod = LoadBalancerStatsServiceGrpc.getGetClientStatsMethod) == null) {
synchronized (LoadBalancerStatsServiceGrpc.class) {
if ((getGetClientStatsMethod = LoadBalancerStatsServiceGrpc.getGetClientStatsMethod) == null) {
LoadBalancerStatsServiceGrpc.getGetClientStatsMethod = getGetClientStatsMethod =
io.grpc.MethodDescriptor.<io.grpc.testing.integration.Messages.LoadBalancerStatsRequest, io.grpc.testing.integration.Messages.LoadBalancerStatsResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetClientStats"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.testing.integration.Messages.LoadBalancerStatsRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.testing.integration.Messages.LoadBalancerStatsResponse.getDefaultInstance()))
.setSchemaDescriptor(new LoadBalancerStatsServiceMethodDescriptorSupplier("GetClientStats"))
.build();
}
}
}
return getGetClientStatsMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static LoadBalancerStatsServiceStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<LoadBalancerStatsServiceStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<LoadBalancerStatsServiceStub>() {
@java.lang.Override
public LoadBalancerStatsServiceStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new LoadBalancerStatsServiceStub(channel, callOptions);
}
};
return LoadBalancerStatsServiceStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static LoadBalancerStatsServiceBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<LoadBalancerStatsServiceBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<LoadBalancerStatsServiceBlockingStub>() {
@java.lang.Override
public LoadBalancerStatsServiceBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new LoadBalancerStatsServiceBlockingStub(channel, callOptions);
}
};
return LoadBalancerStatsServiceBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static LoadBalancerStatsServiceFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<LoadBalancerStatsServiceFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<LoadBalancerStatsServiceFutureStub>() {
@java.lang.Override
public LoadBalancerStatsServiceFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new LoadBalancerStatsServiceFutureStub(channel, callOptions);
}
};
return LoadBalancerStatsServiceFutureStub.newStub(factory, channel);
}
/**
* <pre>
* A service used to obtain stats for verifying LB behavior.
* </pre>
*/
public static abstract class LoadBalancerStatsServiceImplBase implements io.grpc.BindableService {
/**
* <pre>
* Gets the backend distribution for RPCs sent by a test client.
* </pre>
*/
public void getClientStats(io.grpc.testing.integration.Messages.LoadBalancerStatsRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.integration.Messages.LoadBalancerStatsResponse> responseObserver) {
asyncUnimplementedUnaryCall(getGetClientStatsMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getGetClientStatsMethod(),
asyncUnaryCall(
new MethodHandlers<
io.grpc.testing.integration.Messages.LoadBalancerStatsRequest,
io.grpc.testing.integration.Messages.LoadBalancerStatsResponse>(
this, METHODID_GET_CLIENT_STATS)))
.build();
}
}
/**
* <pre>
* A service used to obtain stats for verifying LB behavior.
* </pre>
*/
public static final class LoadBalancerStatsServiceStub extends io.grpc.stub.AbstractAsyncStub<LoadBalancerStatsServiceStub> {
private LoadBalancerStatsServiceStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected LoadBalancerStatsServiceStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new LoadBalancerStatsServiceStub(channel, callOptions);
}
/**
* <pre>
* Gets the backend distribution for RPCs sent by a test client.
* </pre>
*/
public void getClientStats(io.grpc.testing.integration.Messages.LoadBalancerStatsRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.integration.Messages.LoadBalancerStatsResponse> responseObserver) {
asyncUnaryCall(
getChannel().newCall(getGetClientStatsMethod(), getCallOptions()), request, responseObserver);
}
}
/**
* <pre>
* A service used to obtain stats for verifying LB behavior.
* </pre>
*/
public static final class LoadBalancerStatsServiceBlockingStub extends io.grpc.stub.AbstractBlockingStub<LoadBalancerStatsServiceBlockingStub> {
private LoadBalancerStatsServiceBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected LoadBalancerStatsServiceBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new LoadBalancerStatsServiceBlockingStub(channel, callOptions);
}
/**
* <pre>
* Gets the backend distribution for RPCs sent by a test client.
* </pre>
*/
public io.grpc.testing.integration.Messages.LoadBalancerStatsResponse getClientStats(io.grpc.testing.integration.Messages.LoadBalancerStatsRequest request) {
return blockingUnaryCall(
getChannel(), getGetClientStatsMethod(), getCallOptions(), request);
}
}
/**
* <pre>
* A service used to obtain stats for verifying LB behavior.
* </pre>
*/
public static final class LoadBalancerStatsServiceFutureStub extends io.grpc.stub.AbstractFutureStub<LoadBalancerStatsServiceFutureStub> {
private LoadBalancerStatsServiceFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected LoadBalancerStatsServiceFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new LoadBalancerStatsServiceFutureStub(channel, callOptions);
}
/**
* <pre>
* Gets the backend distribution for RPCs sent by a test client.
* </pre>
*/
public com.google.common.util.concurrent.ListenableFuture<io.grpc.testing.integration.Messages.LoadBalancerStatsResponse> getClientStats(
io.grpc.testing.integration.Messages.LoadBalancerStatsRequest request) {
return futureUnaryCall(
getChannel().newCall(getGetClientStatsMethod(), getCallOptions()), request);
}
}
private static final int METHODID_GET_CLIENT_STATS = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final LoadBalancerStatsServiceImplBase serviceImpl;
private final int methodId;
MethodHandlers(LoadBalancerStatsServiceImplBase serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_GET_CLIENT_STATS:
serviceImpl.getClientStats((io.grpc.testing.integration.Messages.LoadBalancerStatsRequest) request,
(io.grpc.stub.StreamObserver<io.grpc.testing.integration.Messages.LoadBalancerStatsResponse>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
private static abstract class LoadBalancerStatsServiceBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
LoadBalancerStatsServiceBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return io.grpc.testing.integration.Test.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("LoadBalancerStatsService");
}
}
private static final class LoadBalancerStatsServiceFileDescriptorSupplier
extends LoadBalancerStatsServiceBaseDescriptorSupplier {
LoadBalancerStatsServiceFileDescriptorSupplier() {}
}
private static final class LoadBalancerStatsServiceMethodDescriptorSupplier
extends LoadBalancerStatsServiceBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final String methodName;
LoadBalancerStatsServiceMethodDescriptorSupplier(String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (LoadBalancerStatsServiceGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new LoadBalancerStatsServiceFileDescriptorSupplier())
.addMethod(getGetClientStatsMethod())
.build();
}
}
}
return result;
}
}

View File

@ -0,0 +1,333 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest;
import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/** Client for xDS interop tests. */
public final class XdsTestClient {
private static Logger logger = Logger.getLogger(XdsTestClient.class.getName());
private final Set<XdsStatsWatcher> watchers = new HashSet<>();
private final Object lock = new Object();
private final List<ManagedChannel> channels = new ArrayList<>();
private int numChannels = 1;
private boolean printResponse = false;
private int qps = 1;
private int rpcTimeoutSec = 2;
private String server = "localhost:8080";
private int statsPort = 8081;
private Server statsServer;
private long currentRequestId;
private ListeningScheduledExecutorService exec;
/**
* The main application allowing this client to be launched from the command line.
*/
public static void main(String[] args) {
final XdsTestClient client = new XdsTestClient();
client.parseArgs(args);
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
@SuppressWarnings("CatchAndPrintStackTrace")
public void run() {
try {
client.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
client.run();
}
private void parseArgs(String[] args) {
boolean usage = false;
for (String arg : args) {
if (!arg.startsWith("--")) {
System.err.println("All arguments must start with '--': " + arg);
usage = true;
break;
}
String[] parts = arg.substring(2).split("=", 2);
String key = parts[0];
if ("help".equals(key)) {
usage = true;
break;
}
if (parts.length != 2) {
System.err.println("All arguments must be of the form --arg=value");
usage = true;
break;
}
String value = parts[1];
if ("num_channels".equals(key)) {
numChannels = Integer.valueOf(value);
} else if ("print_response".equals(key)) {
printResponse = Boolean.valueOf(value);
} else if ("qps".equals(key)) {
qps = Integer.valueOf(value);
} else if ("rpc_timeout_sec".equals(key)) {
rpcTimeoutSec = Integer.valueOf(value);
} else if ("server".equals(key)) {
server = value;
} else if ("stats_port".equals(key)) {
statsPort = Integer.valueOf(value);
} else {
System.err.println("Unknown argument: " + key);
usage = true;
break;
}
}
if (usage) {
XdsTestClient c = new XdsTestClient();
System.err.println(
"Usage: [ARGS...]"
+ "\n"
+ "\n --num_channels=INT Default: "
+ c.numChannels
+ "\n --print_response=BOOL Write RPC response to stdout. Default: "
+ c.printResponse
+ "\n --qps=INT Qps per channel. Default: "
+ c.qps
+ "\n --rpc_timeout_sec=INT Per RPC timeout seconds. Default: "
+ c.rpcTimeoutSec
+ "\n --server=host:port Address of server. Default: "
+ c.server
+ "\n --stats_port=INT Port to expose peer distribution stats service. "
+ "Default: "
+ c.statsPort);
System.exit(1);
}
}
private void run() {
statsServer = NettyServerBuilder.forPort(statsPort).addService(new XdsStatsImpl()).build();
try {
statsServer.start();
for (int i = 0; i < numChannels; i++) {
channels.add(NettyChannelBuilder.forTarget(server).usePlaintext().build());
}
exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
runQps();
} catch (Throwable t) {
logger.log(Level.SEVERE, "Error running client", t);
System.exit(1);
}
}
private void stop() throws InterruptedException {
if (statsServer != null) {
statsServer.shutdownNow();
if (!statsServer.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Timed out waiting for server shutdown");
}
}
for (ManagedChannel channel : channels) {
channel.shutdownNow();
}
if (exec != null) {
exec.shutdownNow();
}
}
private void runQps() throws InterruptedException, ExecutionException, TimeoutException {
final SettableFuture<Void> failure = SettableFuture.create();
final class PeriodicRpc implements Runnable {
final AtomicLong messageIds = new AtomicLong();
@Override
public void run() {
final long requestId;
final Set<XdsStatsWatcher> savedWatchers = new HashSet<>();
synchronized (lock) {
currentRequestId += 1;
requestId = currentRequestId;
savedWatchers.addAll(watchers);
}
SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build();
ManagedChannel channel = channels.get((int) (requestId % channels.size()));
final ClientCall<SimpleRequest, SimpleResponse> call =
channel.newCall(
TestServiceGrpc.getUnaryCallMethod(),
CallOptions.DEFAULT.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS));
call.start(
new ClientCall.Listener<SimpleResponse>() {
private String serverId;
@Override
public void onMessage(SimpleResponse response) {
serverId = response.getServerId();
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC
// service and rely on parsing stdout.
if (printResponse) {
System.out.println(
"Greeting: Hello world, this is "
+ response.getHostname()
+ ", from "
+ call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
}
@Override
public void onClose(Status status, Metadata trailers) {
for (XdsStatsWatcher watcher : savedWatchers) {
watcher.rpcCompleted(requestId, serverId);
}
}
},
new Metadata());
call.sendMessage(request);
call.request(1);
call.halfClose();
}
}
long nanosPerQuery = TimeUnit.SECONDS.toNanos(1) / qps;
ListenableScheduledFuture<?> future =
exec.scheduleAtFixedRate(new PeriodicRpc(), 0, nanosPerQuery, TimeUnit.NANOSECONDS);
Futures.addCallback(
future,
new FutureCallback<Object>() {
@Override
public void onFailure(Throwable t) {
failure.setException(t);
}
@Override
public void onSuccess(Object o) {}
},
MoreExecutors.directExecutor());
failure.get();
}
private class XdsStatsImpl extends LoadBalancerStatsServiceGrpc.LoadBalancerStatsServiceImplBase {
@Override
public void getClientStats(
LoadBalancerStatsRequest req, StreamObserver<LoadBalancerStatsResponse> responseObserver) {
XdsStatsWatcher watcher;
synchronized (lock) {
long startId = currentRequestId + 1;
long endId = startId + req.getNumRpcs();
watcher = new XdsStatsWatcher(startId, endId);
watchers.add(watcher);
}
LoadBalancerStatsResponse response = watcher.waitForRpcStats(req.getTimeoutSec());
synchronized (lock) {
watchers.remove(watcher);
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
/** Records the remote peer distribution for a given range of RPCs. */
private static class XdsStatsWatcher {
private final CountDownLatch latch;
private final long startId;
private final long endId;
private final Map<String, Integer> rpcsByPeer = new HashMap<>();
private final Object lock = new Object();
private int noRemotePeer;
private XdsStatsWatcher(long startId, long endId) {
latch = new CountDownLatch(Ints.checkedCast(endId - startId));
this.startId = startId;
this.endId = endId;
}
void rpcCompleted(long requestId, @Nullable String serverId) {
synchronized (lock) {
if (startId <= requestId && requestId < endId) {
if (serverId != null) {
if (rpcsByPeer.containsKey(serverId)) {
rpcsByPeer.put(serverId, rpcsByPeer.get(serverId) + 1);
} else {
rpcsByPeer.put(serverId, 1);
}
} else {
noRemotePeer += 1;
}
latch.countDown();
}
}
}
LoadBalancerStatsResponse waitForRpcStats(long timeoutSeconds) {
try {
boolean success = latch.await(timeoutSeconds, TimeUnit.SECONDS);
if (!success) {
logger.log(Level.INFO, "Await timed out, returning partial stats");
}
} catch (InterruptedException e) {
logger.log(Level.INFO, "Await interrupted, returning partial stats", e);
Thread.currentThread().interrupt();
}
LoadBalancerStatsResponse.Builder builder = LoadBalancerStatsResponse.newBuilder();
synchronized (lock) {
builder.putAllRpcsByPeer(rpcsByPeer);
builder.setNumFailures(noRemotePeer + (int) latch.getCount());
}
return builder.build();
}
}
}

View File

@ -0,0 +1,144 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/** Interop test server that implements the xDS testing service. */
public final class XdsTestServer {
private static Logger logger = Logger.getLogger(XdsTestServer.class.getName());
private int port = 8080;
private String serverId = "java_server";
private Server server;
/**
* The main application allowing this client to be launched from the command line.
*/
public static void main(String[] args) throws Exception {
final XdsTestServer server = new XdsTestServer();
server.parseArgs(args);
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
@SuppressWarnings("CatchAndPrintStackTrace")
public void run() {
try {
System.out.println("Shutting down");
server.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
server.start();
System.out.println("Server started on port " + server.port);
server.blockUntilShutdown();
}
private void parseArgs(String[] args) {
boolean usage = false;
for (String arg : args) {
if (!arg.startsWith("--")) {
System.err.println("All arguments must start with '--': " + arg);
usage = true;
break;
}
String[] parts = arg.substring(2).split("=", 2);
String key = parts[0];
if ("help".equals(key)) {
usage = true;
break;
}
if (parts.length != 2) {
System.err.println("All arguments must be of the form --arg=value");
usage = true;
break;
}
String value = parts[1];
if ("port".equals(key)) {
port = Integer.valueOf(value);
} else if ("server_id".equals(key)) {
serverId = value;
} else {
System.err.println("Unknown argument: " + key);
usage = true;
break;
}
}
if (usage) {
XdsTestServer s = new XdsTestServer();
System.err.println(
"Usage: [ARGS...]"
+ "\n"
+ "\n --port=INT listening port for server."
+ "\n Default: "
+ s.port
+ "\n --server_id=STRING server ID for response."
+ "\n Default: "
+ s.serverId);
System.exit(1);
}
}
private void start() throws Exception {
server = NettyServerBuilder.forPort(port).addService(new TestServiceImpl()).build().start();
}
private void stop() throws Exception {
server.shutdownNow();
if (!server.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Timed out waiting for server shutdown");
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
private class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private String host = "";
private TestServiceImpl() {
try {
host = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.log(Level.WARNING, "Failed to get host", e);
}
}
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
responseObserver.onNext(
SimpleResponse.newBuilder().setServerId(serverId).setHostname(host).build());
responseObserver.onCompleted();
}
}
}

View File

@ -85,6 +85,8 @@ message SimpleResponse {
// Server ID. This must be unique among different server instances,
// but the same across all RPC's made to a particular server instance.
string server_id = 4;
// Server hostname.
string hostname = 5;
}
message SimpleContext {
@ -160,3 +162,17 @@ message ReconnectInfo {
bool passed = 1;
repeated int32 backoff_ms = 2;
}
message LoadBalancerStatsRequest {
// Request stats for the next num_rpcs sent by client.
int32 num_rpcs = 1;
// If num_rpcs have not completed within timeout_sec, return partial results.
int32 timeout_sec = 2;
}
message LoadBalancerStatsResponse {
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
// The number of RPCs that failed to record a remote peer.
int32 num_failures = 2;
}

View File

@ -76,3 +76,10 @@ service ReconnectService {
rpc Start(grpc.testing.Empty) returns (grpc.testing.Empty);
rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo);
}
// A service used to obtain stats for verifying LB behavior.
service LoadBalancerStatsService {
// Gets the backend distribution for RPCs sent by a test client.
rpc GetClientStats(LoadBalancerStatsRequest)
returns (LoadBalancerStatsResponse) {}
}