From e3d586127397af6d24559fa08e649a91efcdbfa5 Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Wed, 27 Jul 2022 14:31:33 +0530 Subject: [PATCH] istio-interop-testing: add logic to forward non-grpc requests to the Go echo server (#9385) --- istio-interop-testing/build.gradle | 3 +- .../io/grpc/testing/istio/EchoTestServer.java | 224 ++++++++++++---- .../testing/istio/EchoTestServerTest.java | 243 ++++++++++++++++-- 3 files changed, 400 insertions(+), 70 deletions(-) diff --git a/istio-interop-testing/build.gradle b/istio-interop-testing/build.gradle index 45cfa744e7..d53e9b5c52 100644 --- a/istio-interop-testing/build.gradle +++ b/istio-interop-testing/build.gradle @@ -20,7 +20,8 @@ dependencies { project(':grpc-protobuf'), project(':grpc-services'), project(':grpc-stub'), - project(':grpc-testing') + project(':grpc-testing'), + project(':grpc-xds') compileOnly libraries.javax.annotation diff --git a/istio-interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java b/istio-interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java index 9118a6a14c..ae6f60098a 100644 --- a/istio-interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java +++ b/istio-interop-testing/src/main/java/io/grpc/testing/istio/EchoTestServer.java @@ -17,7 +17,14 @@ package io.grpc.testing.istio; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CharMatcher; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.ChannelCredentials; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.Grpc; @@ -27,23 +34,30 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; +import io.grpc.ServerCredentials; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.TlsServerCredentials; +import io.grpc.services.AdminInterface; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; +import io.grpc.xds.XdsChannelCredentials; +import io.grpc.xds.XdsServerCredentials; import io.istio.test.Echo.EchoRequest; import io.istio.test.Echo.EchoResponse; import io.istio.test.Echo.ForwardEchoRequest; import io.istio.test.Echo.ForwardEchoResponse; import io.istio.test.Echo.Header; import io.istio.test.EchoTestServiceGrpc; -import io.istio.test.EchoTestServiceGrpc.EchoTestServiceBlockingStub; +import io.istio.test.EchoTestServiceGrpc.EchoTestServiceFutureStub; import io.istio.test.EchoTestServiceGrpc.EchoTestServiceImplBase; +import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -51,13 +65,18 @@ import java.net.SocketAddress; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * This class implements the Istio echo server functionality similar to @@ -81,28 +100,23 @@ public final class EchoTestServer { private static final String HOSTNAME = "Hostname"; private static final String REQUEST_HEADER = "RequestHeader"; private static final String IP = "IP"; - public static final String GRPC_SCHEME = "grpc://"; @VisibleForTesting List servers; /** - * Preprocess args, for two things: - * 1. merge duplicate flags. So "--grpc=8080 --grpc=9090" becomes + * Preprocess args, for: + * - merging duplicate flags. So "--grpc=8080 --grpc=9090" becomes * "--grpc=8080,9090". - * 2. replace '-' to '_'. So "--istio-version=123" becomes - * "--istio_version=123" (so exclude the leading "--"). **/ @VisibleForTesting static Map> preprocessArgs(String[] args) { HashMap> argsMap = new HashMap<>(); for (String arg : args) { - String[] keyValue = arg.split("=", 2); + List keyValue = Splitter.on('=').limit(2).splitToList(arg); - if (keyValue.length == 2) { - String key = keyValue[0]; - String value = keyValue[1]; - - key = key.substring(0, 2) + key.substring(2).replace('-', '_'); + if (keyValue.size() == 2) { + String key = keyValue.get(0); + String value = keyValue.get(1); List oldValue = argsMap.get(key); if (oldValue == null) { oldValue = new ArrayList<>(); @@ -114,13 +128,14 @@ public final class EchoTestServer { return ImmutableMap.>builder().putAll(argsMap).build(); } - /** Turn gRPC ports from a string list to an int list. */ + /** Turn ports from a string list to an int list. */ @VisibleForTesting - static List getGrpcPorts(Map> args) { - List grpcPorts = args.get("--grpc"); - List grpcPortsInt = new ArrayList<>(grpcPorts.size()); + static Set getPorts(Map> args, String flagName) { + List grpcPorts = args.get(flagName); + Set grpcPortsInt = new HashSet<>(grpcPorts.size()); for (String port : grpcPorts) { + port = CharMatcher.is('\"').trimFrom(port); grpcPortsInt.add(Integer.parseInt(port)); } return grpcPortsInt; @@ -141,11 +156,44 @@ public final class EchoTestServer { */ public static void main(String[] args) throws Exception { Map> processedArgs = preprocessArgs(args); - List grpcPorts = getGrpcPorts(processedArgs); + Set grpcPorts = getPorts(processedArgs, "--grpc"); + Set xdsPorts = getPorts(processedArgs, "--xds-grpc-server"); + // If an xds port does not exist in gRPC ports set, add it. + grpcPorts.addAll(xdsPorts); + // which ports are supposed to use tls + Set tlsPorts = getPorts(processedArgs, "--tls"); + List forwardingAddress = processedArgs.get("--forwarding-address"); + if (forwardingAddress.size() > 1) { + logger.severe("More than one value for --forwarding-address not allowed"); + System.exit(1); + } + if (forwardingAddress.size() == 0) { + forwardingAddress.add("0.0.0.0:7072"); + } + List key = processedArgs.get("key"); + List crt = processedArgs.get("crt"); + + if (key.size() > 1 || crt.size() > 1) { + logger.severe("More than one value for --key or --crt not allowed"); + System.exit(1); + } + if (key.size() != crt.size()) { + logger.severe("Both --key or --crt should be present or absent"); + System.exit(1); + } + ServerCredentials tlsServerCredentials = null; + if (key.size() == 1) { + tlsServerCredentials = TlsServerCredentials.create(new File(crt.get(0)), + new File(key.get(0))); + } else if (!tlsPorts.isEmpty()) { + logger.severe("Both --key or --crt should be present if tls ports used"); + System.exit(1); + } String hostname = determineHostname(); EchoTestServer echoTestServer = new EchoTestServer(); - echoTestServer.runServers(grpcPorts, hostname); + echoTestServer.runServers(hostname, grpcPorts, xdsPorts, tlsPorts, forwardingAddress.get(0), + tlsServerCredentials); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { System.out.println("Shutting down"); @@ -158,20 +206,39 @@ public final class EchoTestServer { echoTestServer.blockUntilShutdown(); } - void runServers(List grpcPorts, String hostname) throws IOException { + void runServers(String hostname, Collection grpcPorts, Collection xdsPorts, + Collection tlsPorts, String forwardingAddress, + ServerCredentials tlsServerCredentials) + throws IOException { ServerServiceDefinition service = ServerInterceptors.intercept( - new EchoTestServiceImpl(hostname), new EchoTestServerInterceptor()); + new EchoTestServiceImpl(hostname, forwardingAddress), new EchoTestServerInterceptor()); servers = new ArrayList<>(grpcPorts.size() + 1); + boolean runAdminServices = Boolean.getBoolean("EXPOSE_GRPC_ADMIN"); for (int port : grpcPorts) { - runServer(port, service); + ServerCredentials serverCredentials = InsecureServerCredentials.create(); + String credTypeString = "over plaintext"; + if (xdsPorts.contains(port)) { + serverCredentials = XdsServerCredentials.create(InsecureServerCredentials.create()); + credTypeString = "over xDS-configured mTLS"; + } else if (tlsPorts.contains(port)) { + serverCredentials = tlsServerCredentials; + credTypeString = "over TLS"; + } + servers.add(runServer(port, service, serverCredentials, credTypeString, runAdminServices)); } } - void runServer(int port, ServerServiceDefinition service) throws IOException { - logger.log(Level.INFO, "Listening GRPC on " + port); - servers.add(Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) - .addService(service) - .build().start()); + static Server runServer( + int port, ServerServiceDefinition service, ServerCredentials serverCredentials, + String credTypeString, boolean runAdminServices) + throws IOException { + logger.log(Level.INFO, "Listening GRPC ({0}) on {1}", new Object[]{credTypeString, port}); + ServerBuilder builder = Grpc.newServerBuilderForPort(port, serverCredentials) + .addService(service); + if (runAdminServices) { + builder = builder.addServices(AdminInterface.getStandardServices()); + } + return builder.build().start(); } void stopServers() { @@ -228,9 +295,14 @@ public final class EchoTestServer { private static class EchoTestServiceImpl extends EchoTestServiceImplBase { private final String hostname; + private final String forwardingAddress; + private final EchoTestServiceGrpc.EchoTestServiceBlockingStub forwardingStub; - EchoTestServiceImpl(String hostname) { + EchoTestServiceImpl(String hostname, String forwardingAddress) { this.hostname = hostname; + this.forwardingAddress = forwardingAddress; + this.forwardingStub = EchoTestServiceGrpc.newBlockingStub( + Grpc.newChannelBuilder(forwardingAddress, InsecureChannelCredentials.create()).build()); } @Override @@ -272,20 +344,39 @@ public final class EchoTestServer { } } + private static final class EchoCall { + EchoResponse response; + Status status; + } + private ForwardEchoResponse buildEchoResponse(ForwardEchoRequest request) throws InterruptedException { ForwardEchoResponse.Builder forwardEchoResponseBuilder = ForwardEchoResponse.newBuilder(); String rawUrl = request.getUrl(); - if (!rawUrl.startsWith(GRPC_SCHEME)) { + List urlParts = Splitter.on(':').limit(2).splitToList(rawUrl); + if (urlParts.size() < 2) { throw new StatusRuntimeException( - Status.UNIMPLEMENTED.withDescription("protocol grpc:// required")); + Status.INVALID_ARGUMENT.withDescription("No protocol configured for url " + rawUrl)); + } + ChannelCredentials creds; + String target = null; + if ("grpc".equals(urlParts.get(0))) { + // We don't really want to test this but the istio test infrastructure needs + // this to be supported. If we ever decide to add support for this properly, + // we would need to add support for TLS creds here. + creds = InsecureChannelCredentials.create(); + target = urlParts.get(1).substring(2); + } else if ("xds".equals(urlParts.get(0))) { + creds = XdsChannelCredentials.create(InsecureChannelCredentials.create()); + target = rawUrl; + } else { + logger.log(Level.INFO, "Protocol {0} not supported. Forwarding to {1}", + new String[]{urlParts.get(0), forwardingAddress}); + return forwardingStub.withDeadline(Context.current().getDeadline()).forwardEcho(request); } - rawUrl = rawUrl.substring(GRPC_SCHEME.length()); - // May need to use xds security if urlScheme is "xds" - ManagedChannelBuilder channelBuilder = Grpc.newChannelBuilder( - rawUrl, InsecureChannelCredentials.create()); + ManagedChannelBuilder channelBuilder = Grpc.newChannelBuilder(target, creds); ManagedChannel channel = channelBuilder.build(); List
requestHeaders = request.getHeadersList(); @@ -297,6 +388,7 @@ public final class EchoTestServer { } int count = request.getCount() == 0 ? 1 : request.getCount(); + // Calculate the amount of time to sleep after each call. Duration durationPerQuery = Duration.ZERO; if (request.getQps() > 0) { durationPerQuery = Duration.ofNanos( @@ -310,17 +402,20 @@ public final class EchoTestServer { Instant start = Instant.now(); logger.info("starting instant=" + start); Duration expected = Duration.ZERO; + final CountDownLatch latch = new CountDownLatch(count); + EchoCall[] echoCalls = new EchoCall[count]; for (int i = 0; i < count; i++) { Metadata currentMetadata = new Metadata(); currentMetadata.merge(metadata); currentMetadata.put( Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER), "" + i); - EchoTestServiceBlockingStub stub - = EchoTestServiceGrpc.newBlockingStub(channel).withInterceptors( + EchoTestServiceGrpc.EchoTestServiceFutureStub stub + = EchoTestServiceGrpc.newFutureStub(channel).withInterceptors( MetadataUtils.newAttachHeadersInterceptor(currentMetadata)) .withDeadlineAfter(request.getTimeoutMicros(), TimeUnit.MICROSECONDS); - String response = callEcho(stub, echoRequest, i); - forwardEchoResponseBuilder.addOutput(response); + + echoCalls[i] = new EchoCall(); + callEcho(stub, echoRequest, echoCalls[i], latch); Instant current = Instant.now(); logger.info("after rpc instant=" + current); Duration elapsed = Duration.between(start, current); @@ -332,18 +427,57 @@ public final class EchoTestServer { Thread.sleep(timeLeft.toMillis()); } } + latch.await(); + for (int i = 0; i < count; i++) { + if (Status.OK.equals(echoCalls[i].status)) { + forwardEchoResponseBuilder.addOutput( + buildForwardEchoStruct(i, echoCalls, request.getMessage())); + } else { + logger.log(Level.SEVERE, "RPC {0} failed {1}: {2}", + new Object[]{i, echoCalls[i].status.getCode(), echoCalls[i].status.getDescription()}); + forwardEchoResponseBuilder.clear(); + throw echoCalls[i].status.asRuntimeException(); + } + } return forwardEchoResponseBuilder.build(); } - private String callEcho(EchoTestServiceBlockingStub stub, - EchoRequest echoRequest, int count) { - try { - EchoResponse echoResponse = stub.echo(echoRequest); - return echoResponse.getMessage(); - } catch (Exception e) { - logger.log(Level.INFO, "RPC failed " + count, e); + private static String buildForwardEchoStruct(int i, EchoCall[] echoCalls, + String requestMessage) { + // The test infrastructure might expect the entire struct instead of + // just the message. + StringBuilder sb = new StringBuilder(); + sb.append(String.format("[%d] grpcecho.Echo(%s)\n", i, requestMessage)); + Iterable iterable = Splitter.on('\n').split(echoCalls[i].response.getMessage()); + for (String line : iterable) { + if (!line.isEmpty()) { + sb.append(String.format("[%d body] %s\n", i, line)); + } } - return ""; + return sb.toString(); + } + + private void callEcho(EchoTestServiceFutureStub stub, + EchoRequest echoRequest, final EchoCall echoCall, CountDownLatch latch) { + + ListenableFuture response = stub.echo(echoRequest); + Futures.addCallback( + response, + new FutureCallback() { + @Override + public void onSuccess(@Nullable EchoResponse result) { + echoCall.response = result; + echoCall.status = Status.OK; + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + echoCall.status = Status.fromThrowable(t); + latch.countDown(); + } + }, + MoreExecutors.directExecutor()); } } diff --git a/istio-interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java b/istio-interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java index c83addab9d..bc882d15f4 100644 --- a/istio-interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java +++ b/istio-interop-testing/src/test/java/io/grpc/testing/istio/EchoTestServerTest.java @@ -19,27 +19,37 @@ package io.grpc.testing.istio; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; import io.istio.test.Echo.EchoRequest; import io.istio.test.Echo.EchoResponse; import io.istio.test.Echo.ForwardEchoRequest; import io.istio.test.Echo.ForwardEchoResponse; import io.istio.test.Echo.Header; import io.istio.test.EchoTestServiceGrpc; +import io.istio.test.EchoTestServiceGrpc.EchoTestServiceImplBase; import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -50,6 +60,30 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class EchoTestServerTest { + private static final String[] EXPECTED_KEY_SET = { + "--server_first", "--forwarding-address", + "--bind_ip", "--istio-version", "--bind_localhost", "--grpc", "--tls", + "--cluster", "--key", "--tcp", "--crt", "--metrics", "--port", "--version" + }; + + private static final String TEST_ARGS = + "--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=\"17070\" --port=\"18085\"" + + " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=\"16060\" --server_first=16060" + + " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\"" + + " --grpc=\"17071\" --port=\"19443\" --tls=19443 --port=\"18082\" --bind_ip=18082" + + " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\"" + + " --port=\"8080\" --port=\"3333\" --version=\"v1\" --istio-version=3 --crt=/cert.crt" + + " --key=/cert.key --forwarding-address=192.168.1.10:7072"; + + private static final String TEST_ARGS_PORTS = + "--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=17070 --port=18085" + + " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=16060 --server_first=16060" + + " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\"" + + " --grpc=\"17071\" --port=\"19443\" --tls=\"19443\" --port=\"18082\" --bind_ip=18082" + + " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\"" + + " --port=\"8080\" --port=3333 --version=\"v1\" --istio-version=3 --crt=/cert.crt" + + " --key=/cert.key --xds-grpc-server=12034 --xds-grpc-server=\"34012\""; + @Test public void preprocessArgsTest() { String[] splitArgs = TEST_ARGS.split(" "); @@ -59,16 +93,16 @@ public class EchoTestServerTest { assertEquals(processedArgs.get("--server_first"), ImmutableList.of("16060", "16061")); assertEquals(processedArgs.get("--bind_ip"), ImmutableList.of("18082")); assertEquals(processedArgs.get("--bind_localhost"), ImmutableList.of("18084")); - assertEquals(processedArgs.get("--version"), ImmutableList.of("\"v1\"")); assertEquals(processedArgs.get("--grpc"), ImmutableList.of("\"17070\"", "\"17071\"")); assertEquals(processedArgs.get("--tls"), ImmutableList.of("18443", "19443")); assertEquals(processedArgs.get("--cluster"), ImmutableList.of("\"cluster-0\"")); assertEquals(processedArgs.get("--key"), ImmutableList.of("/cert.key")); assertEquals(processedArgs.get("--tcp"), ImmutableList.of("\"19090\"", "\"16060\"", "\"19091\"","\"16061\"","\"19092\"")); - assertEquals(processedArgs.get("--istio_version"), ImmutableList.of("3")); + assertEquals(processedArgs.get("--istio-version"), ImmutableList.of("3")); assertEquals(processedArgs.get("--crt"), ImmutableList.of("/cert.crt")); assertEquals(processedArgs.get("--metrics"), ImmutableList.of("15014")); + assertEquals(ImmutableList.of("192.168.1.10:7072"), processedArgs.get("--forwarding-address")); assertEquals( processedArgs.get("--port"), ImmutableList.of( @@ -84,11 +118,34 @@ public class EchoTestServerTest { "\"3333\"")); } + @Test + public void preprocessArgsPortsTest() { + String[] splitArgs = TEST_ARGS_PORTS.split(" "); + Map> processedArgs = EchoTestServer.preprocessArgs(splitArgs); + + Set ports = EchoTestServer.getPorts(processedArgs, "--port"); + assertThat(ports).containsExactly(18080, 8080, 18081, 18082, 19443, 18083, 18084, 18085, + 3333, 18443); + ports = EchoTestServer.getPorts(processedArgs, "--grpc"); + assertThat(ports).containsExactly(17070, 17071); + ports = EchoTestServer.getPorts(processedArgs, "--tls"); + assertThat(ports).containsExactly(18443, 19443); + ports = EchoTestServer.getPorts(processedArgs, "--xds-grpc-server"); + assertThat(ports).containsExactly(34012, 12034); + } + + @Test public void echoTest() throws IOException, InterruptedException { EchoTestServer echoTestServer = new EchoTestServer(); - echoTestServer.runServers(ImmutableList.of(0, 0), "test-host"); + echoTestServer.runServers( + "test-host", + ImmutableList.of(0, 0), + ImmutableList.of(), + ImmutableList.of(), + "0.0.0.0:7072", + null); assertEquals(2, echoTestServer.servers.size()); int port = echoTestServer.servers.get(0).getPort(); assertNotEquals(0, port); @@ -127,10 +184,16 @@ public class EchoTestServerTest { static final int COUNT_OF_REQUESTS_TO_FORWARD = 60; @Test - public void forwardEchoTest() throws IOException { + public void forwardEchoTest() throws IOException, InterruptedException { EchoTestServer echoTestServer = new EchoTestServer(); - echoTestServer.runServers(ImmutableList.of(0, 0), "test-host"); + echoTestServer.runServers( + "test-host", + ImmutableList.of(0, 0), + ImmutableList.of(), + ImmutableList.of(), + "0.0.0.0:7072", + null); assertEquals(2, echoTestServer.servers.size()); int port1 = echoTestServer.servers.get(0).getPort(); int port2 = echoTestServer.servers.get(1).getPort(); @@ -165,29 +228,161 @@ public class EchoTestServerTest { } long duration = Duration.between(start, end).toMillis(); assertThat(duration).isAtLeast(COUNT_OF_REQUESTS_TO_FORWARD * 10L); + echoTestServer.stopServers(); + echoTestServer.blockUntilShutdown(); } private static void validateOutput(String output, int i) { - assertThat(output).contains("RequestHeader=x-request-id:" + i); - assertThat(output).contains("RequestHeader=test-key1:test-value1"); - assertThat(output).contains("RequestHeader=test-key2:test-value2"); - assertThat(output).contains("Hostname=test-host"); - assertThat(output).contains("StatusCode=200"); - assertThat(output).contains("Echo=forward-echo-test-message"); + List content = Splitter.on('\n').splitToList(output); + assertThat(content.size()).isAtLeast(7); // see echo implementation + assertThat(content.get(0)) + .isEqualTo(String.format("[%d] grpcecho.Echo(forward-echo-test-message)", i)); + String prefix = "[" + i + " body] "; + assertThat(content).contains(prefix + "RequestHeader=x-request-id:" + i); + assertThat(content).contains(prefix + "RequestHeader=test-key1:test-value1"); + assertThat(content).contains(prefix + "RequestHeader=test-key2:test-value2"); + assertThat(content).contains(prefix + "Hostname=test-host"); + assertThat(content).contains(prefix + "StatusCode=200"); } - private static final String[] EXPECTED_KEY_SET = { - "--server_first", - "--bind_ip", "--istio_version", "--bind_localhost", "--version", "--grpc", "--tls", - "--cluster", "--key", "--tcp", "--crt", "--metrics", "--port" - }; + @Test + public void nonGrpcForwardEchoTest() throws IOException, InterruptedException { + ForwardServiceForNonGrpcImpl forwardServiceForNonGrpc = new ForwardServiceForNonGrpcImpl(); + forwardServiceForNonGrpc.receivedRequests = new ArrayList<>(); + forwardServiceForNonGrpc.responsesToReturn = new ArrayList<>(); + Server nonGrpcEchoServer = + EchoTestServer.runServer( + 0, forwardServiceForNonGrpc.bindService(), InsecureServerCredentials.create(), + "", false); + int nonGrpcEchoServerPort = nonGrpcEchoServer.getPort(); - private static final String TEST_ARGS = - "--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=\"17070\" --port=\"18085\"" - + " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=\"16060\" --server_first=16060" - + " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\"" - + " --grpc=\"17071\" --port=\"19443\" --tls=19443 --port=\"18082\" --bind_ip=18082" - + " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\"" - + " --port=\"8080\" --port=\"3333\" --version=\"v1\" --istio-version=3 --crt=/cert.crt" - + " --key=/cert.key"; + EchoTestServer echoTestServer = new EchoTestServer(); + + echoTestServer.runServers( + "test-host", + ImmutableList.of(0), + ImmutableList.of(), + ImmutableList.of(), + "0.0.0.0:" + nonGrpcEchoServerPort, + null); + assertEquals(1, echoTestServer.servers.size()); + int port1 = echoTestServer.servers.get(0).getPort(); + + ManagedChannelBuilder channelBuilder = + Grpc.newChannelBuilderForAddress("localhost", port1, InsecureChannelCredentials.create()); + ManagedChannel channel = channelBuilder.build(); + + EchoTestServiceGrpc.EchoTestServiceBlockingStub stub = + EchoTestServiceGrpc.newBlockingStub(channel); + + forwardServiceForNonGrpc.responsesToReturn.add( + ForwardEchoResponse.newBuilder().addOutput("line 1").addOutput("line 2").build()); + + ForwardEchoRequest forwardEchoRequest = + ForwardEchoRequest.newBuilder() + .setCount(COUNT_OF_REQUESTS_TO_FORWARD) + .setQps(100) + .setTimeoutMicros(2000_000L) // 2000 millis + .setUrl("http://www.example.com") // non grpc protocol + .addHeaders( + Header.newBuilder().setKey("test-key1").setValue("test-value1").build()) + .addHeaders( + Header.newBuilder().setKey("test-key2").setValue("test-value2").build()) + .setMessage("non-grpc-forward-echo-test-message1") + .build(); + + ForwardEchoResponse forwardEchoResponse = stub.forwardEcho(forwardEchoRequest); + List outputs = forwardEchoResponse.getOutputList(); + assertEquals(2, outputs.size()); + assertThat(outputs.get(0)).isEqualTo("line 1"); + assertThat(outputs.get(1)).isEqualTo("line 2"); + + assertThat(forwardServiceForNonGrpc.receivedRequests).hasSize(1); + ForwardEchoRequest receivedRequest = forwardServiceForNonGrpc.receivedRequests.remove(0); + assertThat(receivedRequest.getUrl()).isEqualTo("http://www.example.com"); + assertThat(receivedRequest.getMessage()).isEqualTo("non-grpc-forward-echo-test-message1"); + assertThat(receivedRequest.getCount()).isEqualTo(COUNT_OF_REQUESTS_TO_FORWARD); + assertThat(receivedRequest.getQps()).isEqualTo(100); + + forwardServiceForNonGrpc.responsesToReturn.add( + Status.UNIMPLEMENTED.asRuntimeException()); + forwardEchoRequest = + ForwardEchoRequest.newBuilder() + .setCount(1) + .setQps(100) + .setTimeoutMicros(2000_000L) // 2000 millis + .setUrl("redis://192.168.1.1") // non grpc protocol + .addHeaders( + Header.newBuilder().setKey("test-key1").setValue("test-value1").build()) + .setMessage("non-grpc-forward-echo-test-message2") + .build(); + + try { + ForwardEchoResponse unused = stub.forwardEcho(forwardEchoRequest); + fail("exception expected"); + } catch (StatusRuntimeException e) { + assertThat(e.getStatus()).isEqualTo(Status.UNIMPLEMENTED); + } + + assertThat(forwardServiceForNonGrpc.receivedRequests).hasSize(1); + receivedRequest = forwardServiceForNonGrpc.receivedRequests.remove(0); + assertThat(receivedRequest.getUrl()).isEqualTo("redis://192.168.1.1"); + assertThat(receivedRequest.getMessage()).isEqualTo("non-grpc-forward-echo-test-message2"); + assertThat(receivedRequest.getCount()).isEqualTo(1); + + forwardServiceForNonGrpc.responsesToReturn.add( + ForwardEchoResponse.newBuilder().addOutput("line 3").build()); + + forwardEchoRequest = + ForwardEchoRequest.newBuilder() + .setCount(1) + .setQps(100) + .setTimeoutMicros(2000_000L) // 2000 millis + .setUrl("http2://192.168.1.1") // non grpc protocol + .addHeaders( + Header.newBuilder().setKey("test-key3").setValue("test-value3").build()) + .setMessage("non-grpc-forward-echo-test-message3") + .build(); + forwardEchoResponse = stub.forwardEcho(forwardEchoRequest); + outputs = forwardEchoResponse.getOutputList(); + assertEquals(1, outputs.size()); + assertThat(outputs.get(0)).isEqualTo("line 3"); + + assertThat(forwardServiceForNonGrpc.receivedRequests).hasSize(1); + receivedRequest = forwardServiceForNonGrpc.receivedRequests.remove(0); + assertThat(receivedRequest.getUrl()).isEqualTo("http2://192.168.1.1"); + assertThat(receivedRequest.getMessage()).isEqualTo("non-grpc-forward-echo-test-message3"); + List
headers = receivedRequest.getHeadersList(); + assertThat(headers).hasSize(1); + assertThat(headers.get(0).getKey()).isEqualTo("test-key3"); + assertThat(headers.get(0).getValue()).isEqualTo("test-value3"); + + echoTestServer.stopServers(); + echoTestServer.blockUntilShutdown(); + nonGrpcEchoServer.shutdown(); + nonGrpcEchoServer.awaitTermination(5, TimeUnit.SECONDS); + } + + /** + * Emulate the Go Echo server that receives the non-grpc protocol requests. + */ + private static class ForwardServiceForNonGrpcImpl extends EchoTestServiceImplBase { + + List receivedRequests; + List responsesToReturn; + + @Override + public void forwardEcho(ForwardEchoRequest request, + StreamObserver responseObserver) { + receivedRequests.add(request); + Object response = responsesToReturn.remove(0); + if (response instanceof Throwable) { + responseObserver.onError((Throwable) response); + } else if (response instanceof ForwardEchoResponse) { + responseObserver.onNext((ForwardEchoResponse) response); + responseObserver.onCompleted(); + } + responseObserver.onError(new IllegalArgumentException("Unknown type in responsesToReturn")); + } + } }