interop-testing: add echo-server for proxyless gRPC testing in Istio (#9261)

This commit is contained in:
sanjaypujare 2022-06-24 15:20:14 -07:00 committed by GitHub
parent 7bd0797496
commit c2d33f15be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 997 additions and 0 deletions

View File

@ -0,0 +1,350 @@
package io.grpc.testing.istio;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler",
comments = "Source: istio/testing/echo.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class EchoTestServiceGrpc {
private EchoTestServiceGrpc() {}
public static final String SERVICE_NAME = "istio.testing.EchoTestService";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<io.grpc.testing.istio.Istio.EchoRequest,
io.grpc.testing.istio.Istio.EchoResponse> getEchoMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "Echo",
requestType = io.grpc.testing.istio.Istio.EchoRequest.class,
responseType = io.grpc.testing.istio.Istio.EchoResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<io.grpc.testing.istio.Istio.EchoRequest,
io.grpc.testing.istio.Istio.EchoResponse> getEchoMethod() {
io.grpc.MethodDescriptor<io.grpc.testing.istio.Istio.EchoRequest, io.grpc.testing.istio.Istio.EchoResponse> getEchoMethod;
if ((getEchoMethod = EchoTestServiceGrpc.getEchoMethod) == null) {
synchronized (EchoTestServiceGrpc.class) {
if ((getEchoMethod = EchoTestServiceGrpc.getEchoMethod) == null) {
EchoTestServiceGrpc.getEchoMethod = getEchoMethod =
io.grpc.MethodDescriptor.<io.grpc.testing.istio.Istio.EchoRequest, io.grpc.testing.istio.Istio.EchoResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Echo"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.testing.istio.Istio.EchoRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.testing.istio.Istio.EchoResponse.getDefaultInstance()))
.setSchemaDescriptor(new EchoTestServiceMethodDescriptorSupplier("Echo"))
.build();
}
}
}
return getEchoMethod;
}
private static volatile io.grpc.MethodDescriptor<io.grpc.testing.istio.Istio.ForwardEchoRequest,
io.grpc.testing.istio.Istio.ForwardEchoResponse> getForwardEchoMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "ForwardEcho",
requestType = io.grpc.testing.istio.Istio.ForwardEchoRequest.class,
responseType = io.grpc.testing.istio.Istio.ForwardEchoResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<io.grpc.testing.istio.Istio.ForwardEchoRequest,
io.grpc.testing.istio.Istio.ForwardEchoResponse> getForwardEchoMethod() {
io.grpc.MethodDescriptor<io.grpc.testing.istio.Istio.ForwardEchoRequest, io.grpc.testing.istio.Istio.ForwardEchoResponse> getForwardEchoMethod;
if ((getForwardEchoMethod = EchoTestServiceGrpc.getForwardEchoMethod) == null) {
synchronized (EchoTestServiceGrpc.class) {
if ((getForwardEchoMethod = EchoTestServiceGrpc.getForwardEchoMethod) == null) {
EchoTestServiceGrpc.getForwardEchoMethod = getForwardEchoMethod =
io.grpc.MethodDescriptor.<io.grpc.testing.istio.Istio.ForwardEchoRequest, io.grpc.testing.istio.Istio.ForwardEchoResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "ForwardEcho"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.testing.istio.Istio.ForwardEchoRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.testing.istio.Istio.ForwardEchoResponse.getDefaultInstance()))
.setSchemaDescriptor(new EchoTestServiceMethodDescriptorSupplier("ForwardEcho"))
.build();
}
}
}
return getForwardEchoMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static EchoTestServiceStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<EchoTestServiceStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<EchoTestServiceStub>() {
@java.lang.Override
public EchoTestServiceStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new EchoTestServiceStub(channel, callOptions);
}
};
return EchoTestServiceStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static EchoTestServiceBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<EchoTestServiceBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<EchoTestServiceBlockingStub>() {
@java.lang.Override
public EchoTestServiceBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new EchoTestServiceBlockingStub(channel, callOptions);
}
};
return EchoTestServiceBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static EchoTestServiceFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<EchoTestServiceFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<EchoTestServiceFutureStub>() {
@java.lang.Override
public EchoTestServiceFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new EchoTestServiceFutureStub(channel, callOptions);
}
};
return EchoTestServiceFutureStub.newStub(factory, channel);
}
/**
*/
public static abstract class EchoTestServiceImplBase implements io.grpc.BindableService {
/**
*/
public void echo(io.grpc.testing.istio.Istio.EchoRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.EchoResponse> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getEchoMethod(), responseObserver);
}
/**
*/
public void forwardEcho(io.grpc.testing.istio.Istio.ForwardEchoRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.ForwardEchoResponse> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getForwardEchoMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getEchoMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
io.grpc.testing.istio.Istio.EchoRequest,
io.grpc.testing.istio.Istio.EchoResponse>(
this, METHODID_ECHO)))
.addMethod(
getForwardEchoMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
io.grpc.testing.istio.Istio.ForwardEchoRequest,
io.grpc.testing.istio.Istio.ForwardEchoResponse>(
this, METHODID_FORWARD_ECHO)))
.build();
}
}
/**
*/
public static final class EchoTestServiceStub extends io.grpc.stub.AbstractAsyncStub<EchoTestServiceStub> {
private EchoTestServiceStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected EchoTestServiceStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new EchoTestServiceStub(channel, callOptions);
}
/**
*/
public void echo(io.grpc.testing.istio.Istio.EchoRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.EchoResponse> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getEchoMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void forwardEcho(io.grpc.testing.istio.Istio.ForwardEchoRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.ForwardEchoResponse> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getForwardEchoMethod(), getCallOptions()), request, responseObserver);
}
}
/**
*/
public static final class EchoTestServiceBlockingStub extends io.grpc.stub.AbstractBlockingStub<EchoTestServiceBlockingStub> {
private EchoTestServiceBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected EchoTestServiceBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new EchoTestServiceBlockingStub(channel, callOptions);
}
/**
*/
public io.grpc.testing.istio.Istio.EchoResponse echo(io.grpc.testing.istio.Istio.EchoRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getEchoMethod(), getCallOptions(), request);
}
/**
*/
public io.grpc.testing.istio.Istio.ForwardEchoResponse forwardEcho(io.grpc.testing.istio.Istio.ForwardEchoRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getForwardEchoMethod(), getCallOptions(), request);
}
}
/**
*/
public static final class EchoTestServiceFutureStub extends io.grpc.stub.AbstractFutureStub<EchoTestServiceFutureStub> {
private EchoTestServiceFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected EchoTestServiceFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new EchoTestServiceFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<io.grpc.testing.istio.Istio.EchoResponse> echo(
io.grpc.testing.istio.Istio.EchoRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getEchoMethod(), getCallOptions()), request);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<io.grpc.testing.istio.Istio.ForwardEchoResponse> forwardEcho(
io.grpc.testing.istio.Istio.ForwardEchoRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getForwardEchoMethod(), getCallOptions()), request);
}
}
private static final int METHODID_ECHO = 0;
private static final int METHODID_FORWARD_ECHO = 1;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final EchoTestServiceImplBase serviceImpl;
private final int methodId;
MethodHandlers(EchoTestServiceImplBase serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_ECHO:
serviceImpl.echo((io.grpc.testing.istio.Istio.EchoRequest) request,
(io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.EchoResponse>) responseObserver);
break;
case METHODID_FORWARD_ECHO:
serviceImpl.forwardEcho((io.grpc.testing.istio.Istio.ForwardEchoRequest) request,
(io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.ForwardEchoResponse>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
private static abstract class EchoTestServiceBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
EchoTestServiceBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return io.grpc.testing.istio.Istio.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("EchoTestService");
}
}
private static final class EchoTestServiceFileDescriptorSupplier
extends EchoTestServiceBaseDescriptorSupplier {
EchoTestServiceFileDescriptorSupplier() {}
}
private static final class EchoTestServiceMethodDescriptorSupplier
extends EchoTestServiceBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final String methodName;
EchoTestServiceMethodDescriptorSupplier(String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (EchoTestServiceGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new EchoTestServiceFileDescriptorSupplier())
.addMethod(getEchoMethod())
.addMethod(getForwardEchoMethod())
.build();
}
}
}
return result;
}
}

View File

@ -0,0 +1,370 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.istio;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.grpc.ForwardingServerCall;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.istio.EchoTestServiceGrpc.EchoTestServiceImplBase;
import io.grpc.testing.istio.Istio.ForwardEchoRequest;
import io.grpc.testing.istio.Istio.ForwardEchoResponse;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This class implements the Istio echo server functionality similar to
* https://github.com/istio/istio/blob/master/pkg/test/echo/server/endpoint/grpc.go .
* Please see Istio framework docs https://github.com/istio/istio/wiki/Istio-Test-Framework .
*/
public class EchoTestServer {
private static final Logger logger = Logger.getLogger(EchoTestServer.class.getName());
private static final String REQUEST_ID = "x-request-id";
private static final String STATUS_CODE = "StatusCode";
private static final String HOST = "Host";
private static final String HOSTNAME = "Hostname";
private static final String REQUEST_HEADER = "RequestHeader";
private static final String IP = "IP";
public static final String GRPC_SCHEME = "grpc://";
@VisibleForTesting List<Server> servers;
/**
* Preprocess args, for two things:
* 1. merge duplicate flags. So "--grpc=8080 --grpc=9090" becomes
* "--grpc=8080,9090".
* 2. replace '-' to '_'. So "--istio-version=123" becomes
* "--istio_version=123" (so exclude the leading "--").
**/
@VisibleForTesting
static Map<String, List<String>> preprocessArgs(String[] args) {
HashMap<String, List<String>> argsMap = new HashMap<>();
for (String arg : args) {
String[] keyValue = arg.split("=", 2);
if (keyValue.length == 2) {
String key = keyValue[0];
String value = keyValue[1];
key = key.substring(0, 2) + key.substring(2).replace('-', '_');
List<String> oldValue = argsMap.get(key);
if (oldValue == null) {
oldValue = new ArrayList<>();
}
oldValue.add(value);
argsMap.put(key, oldValue);
}
}
return ImmutableMap.<String, List<String>>builder().putAll(argsMap).build();
}
/** Turn gRPC ports from a string list to an int list. */
@VisibleForTesting
static List<Integer> getGrpcPorts(Map<String, List<String>> args) {
List<String> grpcPorts = args.get("--grpc");
List<Integer> grpcPortsInt = new ArrayList<>(grpcPorts.size());
for (String port : grpcPorts) {
grpcPortsInt.add(Integer.parseInt(port));
}
return grpcPortsInt;
}
private static String determineHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (IOException ex) {
logger.log(Level.INFO, "Failed to determine hostname. Will generate one", ex);
}
// let's make an identifier for ourselves.
return "generated-" + new Random().nextInt();
}
/**
* The main application allowing this program to be launched from the command line.
*/
public static void main(String[] args) throws Exception {
Map<String, List<String>> processedArgs = preprocessArgs(args);
List<Integer> grpcPorts = getGrpcPorts(processedArgs);
String hostname = determineHostname();
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(grpcPorts, hostname);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("Shutting down");
echoTestServer.stopServers();
} catch (Exception e) {
logger.log(Level.SEVERE, "stopServers", e);
throw e;
}
}));
echoTestServer.blockUntilShutdown();
}
void runServers(List<Integer> grpcPorts, String hostname) throws IOException {
ServerServiceDefinition service = ServerInterceptors.intercept(
new EchoTestServiceImpl(hostname), new EchoTestServerInterceptor());
servers = new ArrayList<>(grpcPorts.size() + 1);
for (int port : grpcPorts) {
runServer(port, service);
}
}
void runServer(int port, ServerServiceDefinition service) throws IOException {
logger.log(Level.INFO, "Listening GRPC on " + port);
servers.add(Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(service)
.build().start());
}
void stopServers() {
for (Server server : servers) {
server.shutdownNow();
}
}
void blockUntilShutdown() throws InterruptedException {
for (Server server : servers) {
if (!server.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Timed out waiting for server shutdown");
}
}
}
private static class EchoTestServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) {
final String methodName = call.getMethodDescriptor().getBareMethodName();
// we need this processing only for Echo
if (!"Echo".equals(methodName)) {
return next.startCall(call, requestHeaders);
}
final SocketAddress peerAddress = call.getAttributes()
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
return next.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@SuppressWarnings("unchecked")
@Override
public void sendMessage(RespT message) {
io.grpc.testing.istio.Istio.EchoResponse echoResponse =
(io.grpc.testing.istio.Istio.EchoResponse) message;
String oldMessage = echoResponse.getMessage();
EchoMessage echoMessage = new EchoMessage();
for (String key : requestHeaders.keys()) {
if (!key.endsWith("-bin")) {
echoMessage.writeKeyValueForRequest(REQUEST_HEADER, key,
requestHeaders.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)));
}
}
// This is not a complete list. May need to add/remove fields later,
// such as "ServiceVersion", "ServicePort", "URL", "Method", "ResponseHeader",
// "Cluster", "IstioVersion"
// Only keep the fields needed for now.
if (peerAddress instanceof InetSocketAddress) {
InetSocketAddress inetPeerAddress = (InetSocketAddress) peerAddress;
echoMessage.writeKeyValue(IP, inetPeerAddress.getAddress().getHostAddress());
}
echoMessage.writeKeyValue(STATUS_CODE, "200");
echoMessage.writeKeyValue(HOST, call.getAuthority());
echoMessage.writeMessage(oldMessage);
echoResponse =
io.grpc.testing.istio.Istio.EchoResponse.newBuilder()
.setMessage(echoMessage.toString())
.build();
super.sendMessage((RespT) echoResponse);
}
},
requestHeaders);
}
}
private static class EchoTestServiceImpl extends EchoTestServiceImplBase {
private final String hostname;
EchoTestServiceImpl(String hostname) {
this.hostname = hostname;
}
@Override
public void echo(io.grpc.testing.istio.Istio.EchoRequest request,
io.grpc.stub.StreamObserver<io.grpc.testing.istio.Istio.EchoResponse> responseObserver) {
EchoMessage echoMessage = new EchoMessage();
echoMessage.writeKeyValue(HOSTNAME, hostname);
echoMessage.writeKeyValue("Echo", request.getMessage());
io.grpc.testing.istio.Istio.EchoResponse echoResponse
= io.grpc.testing.istio.Istio.EchoResponse.newBuilder()
.setMessage(echoMessage.toString())
.build();
responseObserver.onNext(echoResponse);
responseObserver.onCompleted();
}
@Override
public void forwardEcho(ForwardEchoRequest request,
StreamObserver<ForwardEchoResponse> responseObserver) {
try {
responseObserver.onNext(buildEchoResponse(request));
responseObserver.onCompleted();
} catch (InterruptedException e) {
responseObserver.onError(e);
Thread.currentThread().interrupt();
} catch (Exception e) {
responseObserver.onError(e);
}
}
private ForwardEchoResponse buildEchoResponse(ForwardEchoRequest request)
throws InterruptedException {
Istio.ForwardEchoResponse.Builder forwardEchoResponseBuilder
= io.grpc.testing.istio.Istio.ForwardEchoResponse.newBuilder();
String rawUrl = request.getUrl();
if (!rawUrl.startsWith(GRPC_SCHEME)) {
throw new StatusRuntimeException(
Status.UNIMPLEMENTED.withDescription("protocol grpc:// required"));
}
rawUrl = rawUrl.substring(GRPC_SCHEME.length());
// May need to use xds security if urlScheme is "xds"
ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilder(
rawUrl, InsecureChannelCredentials.create());
ManagedChannel channel = channelBuilder.build();
List<Istio.Header> requestHeaders = request.getHeadersList();
Metadata metadata = new Metadata();
for (Istio.Header header : requestHeaders) {
metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER),
header.getValue());
}
int count = request.getCount() == 0 ? 1 : request.getCount();
Duration durationPerQuery = Duration.ZERO;
if (request.getQps() > 0) {
durationPerQuery = Duration.ofNanos(
Duration.ofSeconds(1).toNanos() / request.getQps());
}
logger.info("qps=" + request.getQps());
logger.info("durationPerQuery=" + durationPerQuery);
io.grpc.testing.istio.Istio.EchoRequest echoRequest
= io.grpc.testing.istio.Istio.EchoRequest.newBuilder()
.setMessage(request.getMessage())
.build();
Instant start = Instant.now();
logger.info("starting instant=" + start);
Duration expected = Duration.ZERO;
for (int i = 0; i < count; i++) {
Metadata currentMetadata = new Metadata();
currentMetadata.merge(metadata);
currentMetadata.put(
Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER), "" + i);
EchoTestServiceGrpc.EchoTestServiceBlockingStub stub
= EchoTestServiceGrpc.newBlockingStub(channel).withInterceptors(
MetadataUtils.newAttachHeadersInterceptor(currentMetadata))
.withDeadlineAfter(request.getTimeoutMicros(), TimeUnit.MICROSECONDS);
String response = callEcho(stub, echoRequest, i);
forwardEchoResponseBuilder.addOutput(response);
Instant current = Instant.now();
logger.info("after rpc instant=" + current);
Duration elapsed = Duration.between(start, current);
expected = expected.plus(durationPerQuery);
Duration timeLeft = expected.minus(elapsed);
logger.info("elapsed=" + elapsed + ", expected=" + expected + ", timeLeft=" + timeLeft);
if (!timeLeft.isNegative()) {
logger.info("sleeping for ms =" + timeLeft);
Thread.sleep(timeLeft.toMillis());
}
}
return forwardEchoResponseBuilder.build();
}
private String callEcho(EchoTestServiceGrpc.EchoTestServiceBlockingStub stub,
io.grpc.testing.istio.Istio.EchoRequest echoRequest, int count) {
try {
Istio.EchoResponse echoResponse = stub.echo(echoRequest);
return echoResponse.getMessage();
} catch (Exception e) {
logger.log(Level.INFO, "RPC failed " + count, e);
}
return "";
}
}
private static class EchoMessage {
private final StringBuilder sb = new StringBuilder();
void writeKeyValue(String key, String value) {
sb.append(key).append("=").append(value).append("\n");
}
void writeKeyValueForRequest(String requestHeader, String key, String value) {
if (value != null) {
writeKeyValue(requestHeader, key + ":" + value);
}
}
void writeMessage(String message) {
sb.append(message);
}
@Override
public String toString() {
return sb.toString();
}
}
}

View File

@ -0,0 +1,86 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package istio.testing;
option java_package = "io.grpc.testing.istio";
option java_outer_classname = "Istio";
service EchoTestService {
rpc Echo (EchoRequest) returns (EchoResponse);
rpc ForwardEcho (ForwardEchoRequest) returns (ForwardEchoResponse);
}
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
message Header {
string key = 1;
string value = 2;
}
message ForwardEchoRequest {
int32 count = 1;
int32 qps = 2;
int64 timeout_micros = 3;
string url = 4;
repeated Header headers = 5;
string message = 6;
// Method for the request. Valid only for HTTP
string method = 9;
// If true, requests will be sent using h2c prior knowledge
bool http2 = 7;
// If true, requests will be sent using http3
bool http3 = 15;
// If true, requests will not be sent until magic string is received
bool serverFirst = 8;
// If true, 301 redirects will be followed
bool followRedirects = 14;
// If non-empty, make the request with the corresponding cert and key.
string cert = 10;
string key = 11;
// If non-empty, verify the server CA
string caCert = 12;
// If non-empty, make the request with the corresponding cert and key file.
string certFile = 16;
string keyFile = 17;
// If non-empty, verify the server CA with the ca cert file.
string caCertFile = 18;
// Skip verifying peer's certificate.
bool insecureSkipVerify = 19;
// List of ALPNs to present. If not set, this will be automatically be set based on the protocol
Alpn alpn = 13;
// Server name (SNI) to present in TLS connections. If not set, Host will be used for http requests.
string serverName = 20;
// Expected response determines what string to look for in the response to validate TCP requests succeeded.
// If not set, defaults to "StatusCode=200"
google.protobuf.StringValue expectedResponse = 21;
}
message Alpn {
repeated string value = 1;
}
message ForwardEchoResponse {
repeated string output = 1;
}

View File

@ -0,0 +1,191 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.istio;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit tests for {@link EchoTestServer}.
*/
@RunWith(JUnit4.class)
public class EchoTestServerTest {
@Test
public void preprocessArgsTest() {
String[] splitArgs = TEST_ARGS.split(" ");
Map<String, List<String>> processedArgs = EchoTestServer.preprocessArgs(splitArgs);
assertEquals(processedArgs.keySet(), ImmutableSet.copyOf(EXPECTED_KEY_SET));
assertEquals(processedArgs.get("--server_first"), ImmutableList.of("16060", "16061"));
assertEquals(processedArgs.get("--bind_ip"), ImmutableList.of("18082"));
assertEquals(processedArgs.get("--bind_localhost"), ImmutableList.of("18084"));
assertEquals(processedArgs.get("--version"), ImmutableList.of("\"v1\""));
assertEquals(processedArgs.get("--grpc"), ImmutableList.of("\"17070\"", "\"17071\""));
assertEquals(processedArgs.get("--tls"), ImmutableList.of("18443", "19443"));
assertEquals(processedArgs.get("--cluster"), ImmutableList.of("\"cluster-0\""));
assertEquals(processedArgs.get("--key"), ImmutableList.of("/cert.key"));
assertEquals(processedArgs.get("--tcp"), ImmutableList.of("\"19090\"", "\"16060\"",
"\"19091\"","\"16061\"","\"19092\""));
assertEquals(processedArgs.get("--istio_version"), ImmutableList.of("3"));
assertEquals(processedArgs.get("--crt"), ImmutableList.of("/cert.crt"));
assertEquals(processedArgs.get("--metrics"), ImmutableList.of("15014"));
assertEquals(
processedArgs.get("--port"),
ImmutableList.of(
"\"18080\"",
"\"18085\"",
"\"18443\"",
"\"18081\"",
"\"19443\"",
"\"18082\"",
"\"18084\"",
"\"18083\"",
"\"8080\"",
"\"3333\""));
}
@Test
public void echoTest() throws IOException, InterruptedException {
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(ImmutableList.of(0, 0), "test-host");
assertEquals(2, echoTestServer.servers.size());
int port = echoTestServer.servers.get(0).getPort();
assertNotEquals(0, port);
assertNotEquals(0, echoTestServer.servers.get(1).getPort());
ManagedChannelBuilder<?> channelBuilder =
Grpc.newChannelBuilderForAddress("localhost", port, InsecureChannelCredentials.create());
ManagedChannel channel = channelBuilder.build();
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("header1", Metadata.ASCII_STRING_MARSHALLER), "value1");
metadata.put(Metadata.Key.of("header2", Metadata.ASCII_STRING_MARSHALLER), "value2");
EchoTestServiceGrpc.EchoTestServiceBlockingStub stub =
EchoTestServiceGrpc.newBlockingStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
io.grpc.testing.istio.Istio.EchoRequest echoRequest
= io.grpc.testing.istio.Istio.EchoRequest.newBuilder()
.setMessage("test-message1")
.build();
Istio.EchoResponse echoResponse = stub.echo(echoRequest);
String echoMessage = echoResponse.getMessage();
Set<String> lines = ImmutableSet.copyOf(echoMessage.split(System.lineSeparator()));
assertThat(lines).contains("RequestHeader=header1:value1");
assertThat(lines).contains("RequestHeader=header2:value2");
assertThat(lines).contains("Echo=test-message1");
assertThat(lines).contains("Hostname=test-host");
assertThat(lines).contains("Host=localhost:" + port);
assertThat(lines).contains("StatusCode=200");
echoTestServer.stopServers();
echoTestServer.blockUntilShutdown();
}
static final int COUNT_OF_REQUESTS_TO_FORWARD = 60;
@Test
public void forwardEchoTest() throws IOException {
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(ImmutableList.of(0, 0), "test-host");
assertEquals(2, echoTestServer.servers.size());
int port1 = echoTestServer.servers.get(0).getPort();
int port2 = echoTestServer.servers.get(1).getPort();
ManagedChannelBuilder<?> channelBuilder =
Grpc.newChannelBuilderForAddress("localhost", port1, InsecureChannelCredentials.create());
ManagedChannel channel = channelBuilder.build();
Istio.ForwardEchoRequest forwardEchoRequest =
Istio.ForwardEchoRequest.newBuilder()
.setCount(COUNT_OF_REQUESTS_TO_FORWARD)
.setQps(100)
.setTimeoutMicros(100_000L) // 100 millis
.setUrl("grpc://localhost:" + port2)
.addHeaders(
Istio.Header.newBuilder().setKey("test-key1").setValue("test-value1").build())
.addHeaders(
Istio.Header.newBuilder().setKey("test-key2").setValue("test-value2").build())
.setMessage("forward-echo-test-message")
.build();
EchoTestServiceGrpc.EchoTestServiceBlockingStub stub =
EchoTestServiceGrpc.newBlockingStub(channel);
Instant start = Instant.now();
Istio.ForwardEchoResponse forwardEchoResponse = stub.forwardEcho(forwardEchoRequest);
Instant end = Instant.now();
List<String> outputs = forwardEchoResponse.getOutputList();
assertEquals(COUNT_OF_REQUESTS_TO_FORWARD, outputs.size());
for (int i = 0; i < COUNT_OF_REQUESTS_TO_FORWARD; i++) {
validateOutput(outputs.get(i), i);
}
long duration = Duration.between(start, end).toMillis();
assertThat(duration).isIn(Range.closed(
COUNT_OF_REQUESTS_TO_FORWARD * 10L, 2 * COUNT_OF_REQUESTS_TO_FORWARD * 10L));
}
private static void validateOutput(String output, int i) {
Set<String> lines = ImmutableSet.copyOf(output.split(System.lineSeparator()));
assertThat(lines).contains("RequestHeader=x-request-id:" + i);
assertThat(lines).contains("RequestHeader=test-key1:test-value1");
assertThat(lines).contains("RequestHeader=test-key2:test-value2");
assertThat(lines).contains("Hostname=test-host");
assertThat(lines).contains("StatusCode=200");
assertThat(lines).contains("Echo=forward-echo-test-message");
}
private static final String[] EXPECTED_KEY_SET = {
"--server_first",
"--bind_ip", "--istio_version", "--bind_localhost", "--version", "--grpc", "--tls",
"--cluster", "--key", "--tcp", "--crt", "--metrics", "--port"
};
private static final String TEST_ARGS =
"--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=\"17070\" --port=\"18085\""
+ " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=\"16060\" --server_first=16060"
+ " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\""
+ " --grpc=\"17071\" --port=\"19443\" --tls=19443 --port=\"18082\" --bind_ip=18082"
+ " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\""
+ " --port=\"8080\" --port=\"3333\" --version=\"v1\" --istio-version=3 --crt=/cert.crt"
+ " --key=/cert.key";
}