From c2d33f15beb6fc281764f363f54ab250220e281d Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Fri, 24 Jun 2022 15:20:14 -0700 Subject: [PATCH] interop-testing: add echo-server for proxyless gRPC testing in Istio (#9261) --- .../testing/istio/EchoTestServiceGrpc.java | 350 +++++++++++++++++ .../io/grpc/testing/istio/EchoTestServer.java | 370 ++++++++++++++++++ .../src/main/proto/istio/testing/echo.proto | 86 ++++ .../testing/istio/EchoTestServerTest.java | 191 +++++++++ 4 files changed, 997 insertions(+) create mode 100644 interop-testing/src/generated/main/grpc/io/grpc/testing/istio/EchoTestServiceGrpc.java create mode 100644 interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java create mode 100644 interop-testing/src/main/proto/istio/testing/echo.proto create mode 100644 interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/istio/EchoTestServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/istio/EchoTestServiceGrpc.java new file mode 100644 index 0000000000..847b34c5f0 --- /dev/null +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/istio/EchoTestServiceGrpc.java @@ -0,0 +1,350 @@ +package io.grpc.testing.istio; + +import static io.grpc.MethodDescriptor.generateFullMethodName; + +/** + */ +@javax.annotation.Generated( + value = "by gRPC proto compiler", + comments = "Source: istio/testing/echo.proto") +@io.grpc.stub.annotations.GrpcGenerated +public final class EchoTestServiceGrpc { + + private EchoTestServiceGrpc() {} + + public static final String SERVICE_NAME = "istio.testing.EchoTestService"; + + // Static method descriptors that strictly reflect the proto. + private static volatile io.grpc.MethodDescriptor getEchoMethod; + + @io.grpc.stub.annotations.RpcMethod( + fullMethodName = SERVICE_NAME + '/' + "Echo", + requestType = io.grpc.testing.istio.Istio.EchoRequest.class, + responseType = io.grpc.testing.istio.Istio.EchoResponse.class, + methodType = io.grpc.MethodDescriptor.MethodType.UNARY) + public static io.grpc.MethodDescriptor getEchoMethod() { + io.grpc.MethodDescriptor getEchoMethod; + if ((getEchoMethod = EchoTestServiceGrpc.getEchoMethod) == null) { + synchronized (EchoTestServiceGrpc.class) { + if ((getEchoMethod = EchoTestServiceGrpc.getEchoMethod) == null) { + EchoTestServiceGrpc.getEchoMethod = getEchoMethod = + io.grpc.MethodDescriptor.newBuilder() + .setType(io.grpc.MethodDescriptor.MethodType.UNARY) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "Echo")) + .setSampledToLocalTracing(true) + .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + io.grpc.testing.istio.Istio.EchoRequest.getDefaultInstance())) + .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + io.grpc.testing.istio.Istio.EchoResponse.getDefaultInstance())) + .setSchemaDescriptor(new EchoTestServiceMethodDescriptorSupplier("Echo")) + .build(); + } + } + } + return getEchoMethod; + } + + private static volatile io.grpc.MethodDescriptor getForwardEchoMethod; + + @io.grpc.stub.annotations.RpcMethod( + fullMethodName = SERVICE_NAME + '/' + "ForwardEcho", + requestType = io.grpc.testing.istio.Istio.ForwardEchoRequest.class, + responseType = io.grpc.testing.istio.Istio.ForwardEchoResponse.class, + methodType = io.grpc.MethodDescriptor.MethodType.UNARY) + public static io.grpc.MethodDescriptor getForwardEchoMethod() { + io.grpc.MethodDescriptor getForwardEchoMethod; + if ((getForwardEchoMethod = EchoTestServiceGrpc.getForwardEchoMethod) == null) { + synchronized (EchoTestServiceGrpc.class) { + if ((getForwardEchoMethod = EchoTestServiceGrpc.getForwardEchoMethod) == null) { + EchoTestServiceGrpc.getForwardEchoMethod = getForwardEchoMethod = + io.grpc.MethodDescriptor.newBuilder() + .setType(io.grpc.MethodDescriptor.MethodType.UNARY) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "ForwardEcho")) + .setSampledToLocalTracing(true) + .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + io.grpc.testing.istio.Istio.ForwardEchoRequest.getDefaultInstance())) + .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + io.grpc.testing.istio.Istio.ForwardEchoResponse.getDefaultInstance())) + .setSchemaDescriptor(new EchoTestServiceMethodDescriptorSupplier("ForwardEcho")) + .build(); + } + } + } + return getForwardEchoMethod; + } + + /** + * Creates a new async stub that supports all call types for the service + */ + public static EchoTestServiceStub newStub(io.grpc.Channel channel) { + io.grpc.stub.AbstractStub.StubFactory factory = + new io.grpc.stub.AbstractStub.StubFactory() { + @java.lang.Override + public EchoTestServiceStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + return new EchoTestServiceStub(channel, callOptions); + } + }; + return EchoTestServiceStub.newStub(factory, channel); + } + + /** + * Creates a new blocking-style stub that supports unary and streaming output calls on the service + */ + public static EchoTestServiceBlockingStub newBlockingStub( + io.grpc.Channel channel) { + io.grpc.stub.AbstractStub.StubFactory factory = + new io.grpc.stub.AbstractStub.StubFactory() { + @java.lang.Override + public EchoTestServiceBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + return new EchoTestServiceBlockingStub(channel, callOptions); + } + }; + return EchoTestServiceBlockingStub.newStub(factory, channel); + } + + /** + * Creates a new ListenableFuture-style stub that supports unary calls on the service + */ + public static EchoTestServiceFutureStub newFutureStub( + io.grpc.Channel channel) { + io.grpc.stub.AbstractStub.StubFactory factory = + new io.grpc.stub.AbstractStub.StubFactory() { + @java.lang.Override + public EchoTestServiceFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + return new EchoTestServiceFutureStub(channel, callOptions); + } + }; + return EchoTestServiceFutureStub.newStub(factory, channel); + } + + /** + */ + public static abstract class EchoTestServiceImplBase implements io.grpc.BindableService { + + /** + */ + public void echo(io.grpc.testing.istio.Istio.EchoRequest request, + io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getEchoMethod(), responseObserver); + } + + /** + */ + public void forwardEcho(io.grpc.testing.istio.Istio.ForwardEchoRequest request, + io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getForwardEchoMethod(), responseObserver); + } + + @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { + return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) + .addMethod( + getEchoMethod(), + io.grpc.stub.ServerCalls.asyncUnaryCall( + new MethodHandlers< + io.grpc.testing.istio.Istio.EchoRequest, + io.grpc.testing.istio.Istio.EchoResponse>( + this, METHODID_ECHO))) + .addMethod( + getForwardEchoMethod(), + io.grpc.stub.ServerCalls.asyncUnaryCall( + new MethodHandlers< + io.grpc.testing.istio.Istio.ForwardEchoRequest, + io.grpc.testing.istio.Istio.ForwardEchoResponse>( + this, METHODID_FORWARD_ECHO))) + .build(); + } + } + + /** + */ + public static final class EchoTestServiceStub extends io.grpc.stub.AbstractAsyncStub { + private EchoTestServiceStub( + io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + super(channel, callOptions); + } + + @java.lang.Override + protected EchoTestServiceStub build( + io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + return new EchoTestServiceStub(channel, callOptions); + } + + /** + */ + public void echo(io.grpc.testing.istio.Istio.EchoRequest request, + io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.ClientCalls.asyncUnaryCall( + getChannel().newCall(getEchoMethod(), getCallOptions()), request, responseObserver); + } + + /** + */ + public void forwardEcho(io.grpc.testing.istio.Istio.ForwardEchoRequest request, + io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.ClientCalls.asyncUnaryCall( + getChannel().newCall(getForwardEchoMethod(), getCallOptions()), request, responseObserver); + } + } + + /** + */ + public static final class EchoTestServiceBlockingStub extends io.grpc.stub.AbstractBlockingStub { + private EchoTestServiceBlockingStub( + io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + super(channel, callOptions); + } + + @java.lang.Override + protected EchoTestServiceBlockingStub build( + io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + return new EchoTestServiceBlockingStub(channel, callOptions); + } + + /** + */ + public io.grpc.testing.istio.Istio.EchoResponse echo(io.grpc.testing.istio.Istio.EchoRequest request) { + return io.grpc.stub.ClientCalls.blockingUnaryCall( + getChannel(), getEchoMethod(), getCallOptions(), request); + } + + /** + */ + public io.grpc.testing.istio.Istio.ForwardEchoResponse forwardEcho(io.grpc.testing.istio.Istio.ForwardEchoRequest request) { + return io.grpc.stub.ClientCalls.blockingUnaryCall( + getChannel(), getForwardEchoMethod(), getCallOptions(), request); + } + } + + /** + */ + public static final class EchoTestServiceFutureStub extends io.grpc.stub.AbstractFutureStub { + private EchoTestServiceFutureStub( + io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + super(channel, callOptions); + } + + @java.lang.Override + protected EchoTestServiceFutureStub build( + io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + return new EchoTestServiceFutureStub(channel, callOptions); + } + + /** + */ + public com.google.common.util.concurrent.ListenableFuture echo( + io.grpc.testing.istio.Istio.EchoRequest request) { + return io.grpc.stub.ClientCalls.futureUnaryCall( + getChannel().newCall(getEchoMethod(), getCallOptions()), request); + } + + /** + */ + public com.google.common.util.concurrent.ListenableFuture forwardEcho( + io.grpc.testing.istio.Istio.ForwardEchoRequest request) { + return io.grpc.stub.ClientCalls.futureUnaryCall( + getChannel().newCall(getForwardEchoMethod(), getCallOptions()), request); + } + } + + private static final int METHODID_ECHO = 0; + private static final int METHODID_FORWARD_ECHO = 1; + + private static final class MethodHandlers implements + io.grpc.stub.ServerCalls.UnaryMethod, + io.grpc.stub.ServerCalls.ServerStreamingMethod, + io.grpc.stub.ServerCalls.ClientStreamingMethod, + io.grpc.stub.ServerCalls.BidiStreamingMethod { + private final EchoTestServiceImplBase serviceImpl; + private final int methodId; + + MethodHandlers(EchoTestServiceImplBase serviceImpl, int methodId) { + this.serviceImpl = serviceImpl; + this.methodId = methodId; + } + + @java.lang.Override + @java.lang.SuppressWarnings("unchecked") + public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) { + switch (methodId) { + case METHODID_ECHO: + serviceImpl.echo((io.grpc.testing.istio.Istio.EchoRequest) request, + (io.grpc.stub.StreamObserver) responseObserver); + break; + case METHODID_FORWARD_ECHO: + serviceImpl.forwardEcho((io.grpc.testing.istio.Istio.ForwardEchoRequest) request, + (io.grpc.stub.StreamObserver) responseObserver); + break; + default: + throw new AssertionError(); + } + } + + @java.lang.Override + @java.lang.SuppressWarnings("unchecked") + public io.grpc.stub.StreamObserver invoke( + io.grpc.stub.StreamObserver responseObserver) { + switch (methodId) { + default: + throw new AssertionError(); + } + } + } + + private static abstract class EchoTestServiceBaseDescriptorSupplier + implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier { + EchoTestServiceBaseDescriptorSupplier() {} + + @java.lang.Override + public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() { + return io.grpc.testing.istio.Istio.getDescriptor(); + } + + @java.lang.Override + public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() { + return getFileDescriptor().findServiceByName("EchoTestService"); + } + } + + private static final class EchoTestServiceFileDescriptorSupplier + extends EchoTestServiceBaseDescriptorSupplier { + EchoTestServiceFileDescriptorSupplier() {} + } + + private static final class EchoTestServiceMethodDescriptorSupplier + extends EchoTestServiceBaseDescriptorSupplier + implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { + private final String methodName; + + EchoTestServiceMethodDescriptorSupplier(String methodName) { + this.methodName = methodName; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() { + return getServiceDescriptor().findMethodByName(methodName); + } + } + + private static volatile io.grpc.ServiceDescriptor serviceDescriptor; + + public static io.grpc.ServiceDescriptor getServiceDescriptor() { + io.grpc.ServiceDescriptor result = serviceDescriptor; + if (result == null) { + synchronized (EchoTestServiceGrpc.class) { + result = serviceDescriptor; + if (result == null) { + serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME) + .setSchemaDescriptor(new EchoTestServiceFileDescriptorSupplier()) + .addMethod(getEchoMethod()) + .addMethod(getForwardEchoMethod()) + .build(); + } + } + } + return result; + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java b/interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java new file mode 100644 index 0000000000..23d8b69c9f --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java @@ -0,0 +1,370 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.istio; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import io.grpc.ForwardingServerCall; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.istio.EchoTestServiceGrpc.EchoTestServiceImplBase; +import io.grpc.testing.istio.Istio.ForwardEchoRequest; +import io.grpc.testing.istio.Istio.ForwardEchoResponse; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This class implements the Istio echo server functionality similar to + * https://github.com/istio/istio/blob/master/pkg/test/echo/server/endpoint/grpc.go . + * Please see Istio framework docs https://github.com/istio/istio/wiki/Istio-Test-Framework . + */ +public class EchoTestServer { + + private static final Logger logger = Logger.getLogger(EchoTestServer.class.getName()); + + private static final String REQUEST_ID = "x-request-id"; + private static final String STATUS_CODE = "StatusCode"; + private static final String HOST = "Host"; + private static final String HOSTNAME = "Hostname"; + private static final String REQUEST_HEADER = "RequestHeader"; + private static final String IP = "IP"; + public static final String GRPC_SCHEME = "grpc://"; + + @VisibleForTesting List servers; + + /** + * Preprocess args, for two things: + * 1. merge duplicate flags. So "--grpc=8080 --grpc=9090" becomes + * "--grpc=8080,9090". + * 2. replace '-' to '_'. So "--istio-version=123" becomes + * "--istio_version=123" (so exclude the leading "--"). + **/ + @VisibleForTesting + static Map> preprocessArgs(String[] args) { + HashMap> argsMap = new HashMap<>(); + for (String arg : args) { + String[] keyValue = arg.split("=", 2); + + if (keyValue.length == 2) { + String key = keyValue[0]; + String value = keyValue[1]; + + key = key.substring(0, 2) + key.substring(2).replace('-', '_'); + List oldValue = argsMap.get(key); + if (oldValue == null) { + oldValue = new ArrayList<>(); + } + oldValue.add(value); + argsMap.put(key, oldValue); + } + } + return ImmutableMap.>builder().putAll(argsMap).build(); + } + + /** Turn gRPC ports from a string list to an int list. */ + @VisibleForTesting + static List getGrpcPorts(Map> args) { + List grpcPorts = args.get("--grpc"); + List grpcPortsInt = new ArrayList<>(grpcPorts.size()); + + for (String port : grpcPorts) { + grpcPortsInt.add(Integer.parseInt(port)); + } + return grpcPortsInt; + } + + private static String determineHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (IOException ex) { + logger.log(Level.INFO, "Failed to determine hostname. Will generate one", ex); + } + // let's make an identifier for ourselves. + return "generated-" + new Random().nextInt(); + } + + /** + * The main application allowing this program to be launched from the command line. + */ + public static void main(String[] args) throws Exception { + Map> processedArgs = preprocessArgs(args); + List grpcPorts = getGrpcPorts(processedArgs); + + String hostname = determineHostname(); + EchoTestServer echoTestServer = new EchoTestServer(); + echoTestServer.runServers(grpcPorts, hostname); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + System.out.println("Shutting down"); + echoTestServer.stopServers(); + } catch (Exception e) { + logger.log(Level.SEVERE, "stopServers", e); + throw e; + } + })); + echoTestServer.blockUntilShutdown(); + } + + void runServers(List grpcPorts, String hostname) throws IOException { + ServerServiceDefinition service = ServerInterceptors.intercept( + new EchoTestServiceImpl(hostname), new EchoTestServerInterceptor()); + servers = new ArrayList<>(grpcPorts.size() + 1); + for (int port : grpcPorts) { + runServer(port, service); + } + } + + void runServer(int port, ServerServiceDefinition service) throws IOException { + logger.log(Level.INFO, "Listening GRPC on " + port); + servers.add(Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(service) + .build().start()); + } + + void stopServers() { + for (Server server : servers) { + server.shutdownNow(); + } + } + + void blockUntilShutdown() throws InterruptedException { + for (Server server : servers) { + if (!server.awaitTermination(5, TimeUnit.SECONDS)) { + System.err.println("Timed out waiting for server shutdown"); + } + } + } + + private static class EchoTestServerInterceptor implements ServerInterceptor { + + @Override + public ServerCall.Listener interceptCall(ServerCall call, + final Metadata requestHeaders, ServerCallHandler next) { + final String methodName = call.getMethodDescriptor().getBareMethodName(); + + // we need this processing only for Echo + if (!"Echo".equals(methodName)) { + return next.startCall(call, requestHeaders); + } + + final SocketAddress peerAddress = call.getAttributes() + .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + + return next.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(call) { + + @SuppressWarnings("unchecked") + @Override + public void sendMessage(RespT message) { + io.grpc.testing.istio.Istio.EchoResponse echoResponse = + (io.grpc.testing.istio.Istio.EchoResponse) message; + String oldMessage = echoResponse.getMessage(); + + EchoMessage echoMessage = new EchoMessage(); + + for (String key : requestHeaders.keys()) { + if (!key.endsWith("-bin")) { + + echoMessage.writeKeyValueForRequest(REQUEST_HEADER, key, + requestHeaders.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))); + } + } + // This is not a complete list. May need to add/remove fields later, + // such as "ServiceVersion", "ServicePort", "URL", "Method", "ResponseHeader", + // "Cluster", "IstioVersion" + // Only keep the fields needed for now. + if (peerAddress instanceof InetSocketAddress) { + InetSocketAddress inetPeerAddress = (InetSocketAddress) peerAddress; + echoMessage.writeKeyValue(IP, inetPeerAddress.getAddress().getHostAddress()); + } + echoMessage.writeKeyValue(STATUS_CODE, "200"); + echoMessage.writeKeyValue(HOST, call.getAuthority()); + echoMessage.writeMessage(oldMessage); + echoResponse = + io.grpc.testing.istio.Istio.EchoResponse.newBuilder() + .setMessage(echoMessage.toString()) + .build(); + super.sendMessage((RespT) echoResponse); + } + }, + requestHeaders); + } + } + + private static class EchoTestServiceImpl extends EchoTestServiceImplBase { + + private final String hostname; + + EchoTestServiceImpl(String hostname) { + this.hostname = hostname; + } + + @Override + public void echo(io.grpc.testing.istio.Istio.EchoRequest request, + io.grpc.stub.StreamObserver responseObserver) { + + EchoMessage echoMessage = new EchoMessage(); + echoMessage.writeKeyValue(HOSTNAME, hostname); + echoMessage.writeKeyValue("Echo", request.getMessage()); + io.grpc.testing.istio.Istio.EchoResponse echoResponse + = io.grpc.testing.istio.Istio.EchoResponse.newBuilder() + .setMessage(echoMessage.toString()) + .build(); + + responseObserver.onNext(echoResponse); + responseObserver.onCompleted(); + } + + @Override + public void forwardEcho(ForwardEchoRequest request, + StreamObserver responseObserver) { + try { + responseObserver.onNext(buildEchoResponse(request)); + responseObserver.onCompleted(); + } catch (InterruptedException e) { + responseObserver.onError(e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + responseObserver.onError(e); + } + } + + private ForwardEchoResponse buildEchoResponse(ForwardEchoRequest request) + throws InterruptedException { + Istio.ForwardEchoResponse.Builder forwardEchoResponseBuilder + = io.grpc.testing.istio.Istio.ForwardEchoResponse.newBuilder(); + String rawUrl = request.getUrl(); + if (!rawUrl.startsWith(GRPC_SCHEME)) { + throw new StatusRuntimeException( + Status.UNIMPLEMENTED.withDescription("protocol grpc:// required")); + } + rawUrl = rawUrl.substring(GRPC_SCHEME.length()); + + // May need to use xds security if urlScheme is "xds" + ManagedChannelBuilder channelBuilder = Grpc.newChannelBuilder( + rawUrl, InsecureChannelCredentials.create()); + ManagedChannel channel = channelBuilder.build(); + + List requestHeaders = request.getHeadersList(); + Metadata metadata = new Metadata(); + + for (Istio.Header header : requestHeaders) { + metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER), + header.getValue()); + } + + int count = request.getCount() == 0 ? 1 : request.getCount(); + Duration durationPerQuery = Duration.ZERO; + if (request.getQps() > 0) { + durationPerQuery = Duration.ofNanos( + Duration.ofSeconds(1).toNanos() / request.getQps()); + } + logger.info("qps=" + request.getQps()); + logger.info("durationPerQuery=" + durationPerQuery); + io.grpc.testing.istio.Istio.EchoRequest echoRequest + = io.grpc.testing.istio.Istio.EchoRequest.newBuilder() + .setMessage(request.getMessage()) + .build(); + Instant start = Instant.now(); + logger.info("starting instant=" + start); + Duration expected = Duration.ZERO; + for (int i = 0; i < count; i++) { + Metadata currentMetadata = new Metadata(); + currentMetadata.merge(metadata); + currentMetadata.put( + Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER), "" + i); + EchoTestServiceGrpc.EchoTestServiceBlockingStub stub + = EchoTestServiceGrpc.newBlockingStub(channel).withInterceptors( + MetadataUtils.newAttachHeadersInterceptor(currentMetadata)) + .withDeadlineAfter(request.getTimeoutMicros(), TimeUnit.MICROSECONDS); + String response = callEcho(stub, echoRequest, i); + forwardEchoResponseBuilder.addOutput(response); + Instant current = Instant.now(); + logger.info("after rpc instant=" + current); + Duration elapsed = Duration.between(start, current); + expected = expected.plus(durationPerQuery); + Duration timeLeft = expected.minus(elapsed); + logger.info("elapsed=" + elapsed + ", expected=" + expected + ", timeLeft=" + timeLeft); + if (!timeLeft.isNegative()) { + logger.info("sleeping for ms =" + timeLeft); + Thread.sleep(timeLeft.toMillis()); + } + } + return forwardEchoResponseBuilder.build(); + } + + private String callEcho(EchoTestServiceGrpc.EchoTestServiceBlockingStub stub, + io.grpc.testing.istio.Istio.EchoRequest echoRequest, int count) { + try { + Istio.EchoResponse echoResponse = stub.echo(echoRequest); + return echoResponse.getMessage(); + } catch (Exception e) { + logger.log(Level.INFO, "RPC failed " + count, e); + } + return ""; + } + } + + private static class EchoMessage { + private final StringBuilder sb = new StringBuilder(); + + void writeKeyValue(String key, String value) { + sb.append(key).append("=").append(value).append("\n"); + } + + void writeKeyValueForRequest(String requestHeader, String key, String value) { + if (value != null) { + writeKeyValue(requestHeader, key + ":" + value); + } + } + + void writeMessage(String message) { + sb.append(message); + } + + @Override + public String toString() { + return sb.toString(); + } + } +} diff --git a/interop-testing/src/main/proto/istio/testing/echo.proto b/interop-testing/src/main/proto/istio/testing/echo.proto new file mode 100644 index 0000000000..16aeb4f234 --- /dev/null +++ b/interop-testing/src/main/proto/istio/testing/echo.proto @@ -0,0 +1,86 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "google/protobuf/wrappers.proto"; + +package istio.testing; + +option java_package = "io.grpc.testing.istio"; +option java_outer_classname = "Istio"; + +service EchoTestService { + rpc Echo (EchoRequest) returns (EchoResponse); + rpc ForwardEcho (ForwardEchoRequest) returns (ForwardEchoResponse); +} + +message EchoRequest { + string message = 1; +} + +message EchoResponse { + string message = 1; +} + +message Header { + string key = 1; + string value = 2; +} + +message ForwardEchoRequest { + int32 count = 1; + int32 qps = 2; + int64 timeout_micros = 3; + string url = 4; + repeated Header headers = 5; + string message = 6; + // Method for the request. Valid only for HTTP + string method = 9; + // If true, requests will be sent using h2c prior knowledge + bool http2 = 7; + // If true, requests will be sent using http3 + bool http3 = 15; + // If true, requests will not be sent until magic string is received + bool serverFirst = 8; + // If true, 301 redirects will be followed + bool followRedirects = 14; + // If non-empty, make the request with the corresponding cert and key. + string cert = 10; + string key = 11; + // If non-empty, verify the server CA + string caCert = 12; + // If non-empty, make the request with the corresponding cert and key file. + string certFile = 16; + string keyFile = 17; + // If non-empty, verify the server CA with the ca cert file. + string caCertFile = 18; + // Skip verifying peer's certificate. + bool insecureSkipVerify = 19; + // List of ALPNs to present. If not set, this will be automatically be set based on the protocol + Alpn alpn = 13; + // Server name (SNI) to present in TLS connections. If not set, Host will be used for http requests. + string serverName = 20; + // Expected response determines what string to look for in the response to validate TCP requests succeeded. + // If not set, defaults to "StatusCode=200" + google.protobuf.StringValue expectedResponse = 21; +} + +message Alpn { + repeated string value = 1; +} + +message ForwardEchoResponse { + repeated string output = 1; +} diff --git a/interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java b/interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java new file mode 100644 index 0000000000..241d57afa7 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java @@ -0,0 +1,191 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.istio; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Range; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link EchoTestServer}. + */ +@RunWith(JUnit4.class) +public class EchoTestServerTest { + + @Test + public void preprocessArgsTest() { + String[] splitArgs = TEST_ARGS.split(" "); + Map> processedArgs = EchoTestServer.preprocessArgs(splitArgs); + + assertEquals(processedArgs.keySet(), ImmutableSet.copyOf(EXPECTED_KEY_SET)); + assertEquals(processedArgs.get("--server_first"), ImmutableList.of("16060", "16061")); + assertEquals(processedArgs.get("--bind_ip"), ImmutableList.of("18082")); + assertEquals(processedArgs.get("--bind_localhost"), ImmutableList.of("18084")); + assertEquals(processedArgs.get("--version"), ImmutableList.of("\"v1\"")); + assertEquals(processedArgs.get("--grpc"), ImmutableList.of("\"17070\"", "\"17071\"")); + assertEquals(processedArgs.get("--tls"), ImmutableList.of("18443", "19443")); + assertEquals(processedArgs.get("--cluster"), ImmutableList.of("\"cluster-0\"")); + assertEquals(processedArgs.get("--key"), ImmutableList.of("/cert.key")); + assertEquals(processedArgs.get("--tcp"), ImmutableList.of("\"19090\"", "\"16060\"", + "\"19091\"","\"16061\"","\"19092\"")); + assertEquals(processedArgs.get("--istio_version"), ImmutableList.of("3")); + assertEquals(processedArgs.get("--crt"), ImmutableList.of("/cert.crt")); + assertEquals(processedArgs.get("--metrics"), ImmutableList.of("15014")); + assertEquals( + processedArgs.get("--port"), + ImmutableList.of( + "\"18080\"", + "\"18085\"", + "\"18443\"", + "\"18081\"", + "\"19443\"", + "\"18082\"", + "\"18084\"", + "\"18083\"", + "\"8080\"", + "\"3333\"")); + } + + @Test + public void echoTest() throws IOException, InterruptedException { + EchoTestServer echoTestServer = new EchoTestServer(); + + echoTestServer.runServers(ImmutableList.of(0, 0), "test-host"); + assertEquals(2, echoTestServer.servers.size()); + int port = echoTestServer.servers.get(0).getPort(); + assertNotEquals(0, port); + assertNotEquals(0, echoTestServer.servers.get(1).getPort()); + + ManagedChannelBuilder channelBuilder = + Grpc.newChannelBuilderForAddress("localhost", port, InsecureChannelCredentials.create()); + ManagedChannel channel = channelBuilder.build(); + + Metadata metadata = new Metadata(); + metadata.put(Metadata.Key.of("header1", Metadata.ASCII_STRING_MARSHALLER), "value1"); + metadata.put(Metadata.Key.of("header2", Metadata.ASCII_STRING_MARSHALLER), "value2"); + + EchoTestServiceGrpc.EchoTestServiceBlockingStub stub = + EchoTestServiceGrpc.newBlockingStub(channel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); + + io.grpc.testing.istio.Istio.EchoRequest echoRequest + = io.grpc.testing.istio.Istio.EchoRequest.newBuilder() + .setMessage("test-message1") + .build(); + Istio.EchoResponse echoResponse = stub.echo(echoRequest); + String echoMessage = echoResponse.getMessage(); + Set lines = ImmutableSet.copyOf(echoMessage.split(System.lineSeparator())); + + assertThat(lines).contains("RequestHeader=header1:value1"); + assertThat(lines).contains("RequestHeader=header2:value2"); + assertThat(lines).contains("Echo=test-message1"); + assertThat(lines).contains("Hostname=test-host"); + assertThat(lines).contains("Host=localhost:" + port); + assertThat(lines).contains("StatusCode=200"); + + echoTestServer.stopServers(); + echoTestServer.blockUntilShutdown(); + } + + static final int COUNT_OF_REQUESTS_TO_FORWARD = 60; + + @Test + public void forwardEchoTest() throws IOException { + EchoTestServer echoTestServer = new EchoTestServer(); + + echoTestServer.runServers(ImmutableList.of(0, 0), "test-host"); + assertEquals(2, echoTestServer.servers.size()); + int port1 = echoTestServer.servers.get(0).getPort(); + int port2 = echoTestServer.servers.get(1).getPort(); + + ManagedChannelBuilder channelBuilder = + Grpc.newChannelBuilderForAddress("localhost", port1, InsecureChannelCredentials.create()); + ManagedChannel channel = channelBuilder.build(); + + Istio.ForwardEchoRequest forwardEchoRequest = + Istio.ForwardEchoRequest.newBuilder() + .setCount(COUNT_OF_REQUESTS_TO_FORWARD) + .setQps(100) + .setTimeoutMicros(100_000L) // 100 millis + .setUrl("grpc://localhost:" + port2) + .addHeaders( + Istio.Header.newBuilder().setKey("test-key1").setValue("test-value1").build()) + .addHeaders( + Istio.Header.newBuilder().setKey("test-key2").setValue("test-value2").build()) + .setMessage("forward-echo-test-message") + .build(); + + EchoTestServiceGrpc.EchoTestServiceBlockingStub stub = + EchoTestServiceGrpc.newBlockingStub(channel); + + Instant start = Instant.now(); + Istio.ForwardEchoResponse forwardEchoResponse = stub.forwardEcho(forwardEchoRequest); + Instant end = Instant.now(); + List outputs = forwardEchoResponse.getOutputList(); + assertEquals(COUNT_OF_REQUESTS_TO_FORWARD, outputs.size()); + for (int i = 0; i < COUNT_OF_REQUESTS_TO_FORWARD; i++) { + validateOutput(outputs.get(i), i); + } + long duration = Duration.between(start, end).toMillis(); + assertThat(duration).isIn(Range.closed( + COUNT_OF_REQUESTS_TO_FORWARD * 10L, 2 * COUNT_OF_REQUESTS_TO_FORWARD * 10L)); + } + + private static void validateOutput(String output, int i) { + Set lines = ImmutableSet.copyOf(output.split(System.lineSeparator())); + assertThat(lines).contains("RequestHeader=x-request-id:" + i); + assertThat(lines).contains("RequestHeader=test-key1:test-value1"); + assertThat(lines).contains("RequestHeader=test-key2:test-value2"); + assertThat(lines).contains("Hostname=test-host"); + assertThat(lines).contains("StatusCode=200"); + assertThat(lines).contains("Echo=forward-echo-test-message"); + } + + private static final String[] EXPECTED_KEY_SET = { + "--server_first", + "--bind_ip", "--istio_version", "--bind_localhost", "--version", "--grpc", "--tls", + "--cluster", "--key", "--tcp", "--crt", "--metrics", "--port" + }; + + private static final String TEST_ARGS = + "--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=\"17070\" --port=\"18085\"" + + " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=\"16060\" --server_first=16060" + + " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\"" + + " --grpc=\"17071\" --port=\"19443\" --tls=19443 --port=\"18082\" --bind_ip=18082" + + " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\"" + + " --port=\"8080\" --port=\"3333\" --version=\"v1\" --istio-version=3 --crt=/cert.crt" + + " --key=/cert.key"; +}