istio-interop-testing: add logic to forward non-grpc requests to the Go echo server (#9385)

This commit is contained in:
sanjaypujare 2022-07-27 14:31:33 +05:30 committed by GitHub
parent 70a29fbfe3
commit e3d5861273
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 400 additions and 70 deletions

View File

@ -20,7 +20,8 @@ dependencies {
project(':grpc-protobuf'),
project(':grpc-services'),
project(':grpc-stub'),
project(':grpc-testing')
project(':grpc-testing'),
project(':grpc-xds')
compileOnly libraries.javax.annotation

View File

@ -17,7 +17,14 @@
package io.grpc.testing.istio;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Grpc;
@ -27,23 +34,30 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.TlsServerCredentials;
import io.grpc.services.AdminInterface;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.XdsChannelCredentials;
import io.grpc.xds.XdsServerCredentials;
import io.istio.test.Echo.EchoRequest;
import io.istio.test.Echo.EchoResponse;
import io.istio.test.Echo.ForwardEchoRequest;
import io.istio.test.Echo.ForwardEchoResponse;
import io.istio.test.Echo.Header;
import io.istio.test.EchoTestServiceGrpc;
import io.istio.test.EchoTestServiceGrpc.EchoTestServiceBlockingStub;
import io.istio.test.EchoTestServiceGrpc.EchoTestServiceFutureStub;
import io.istio.test.EchoTestServiceGrpc.EchoTestServiceImplBase;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -51,13 +65,18 @@ import java.net.SocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* This class implements the Istio echo server functionality similar to
@ -81,28 +100,23 @@ public final class EchoTestServer {
private static final String HOSTNAME = "Hostname";
private static final String REQUEST_HEADER = "RequestHeader";
private static final String IP = "IP";
public static final String GRPC_SCHEME = "grpc://";
@VisibleForTesting List<Server> servers;
/**
* Preprocess args, for two things:
* 1. merge duplicate flags. So "--grpc=8080 --grpc=9090" becomes
* Preprocess args, for:
* - merging duplicate flags. So "--grpc=8080 --grpc=9090" becomes
* "--grpc=8080,9090".
* 2. replace '-' to '_'. So "--istio-version=123" becomes
* "--istio_version=123" (so exclude the leading "--").
**/
@VisibleForTesting
static Map<String, List<String>> preprocessArgs(String[] args) {
HashMap<String, List<String>> argsMap = new HashMap<>();
for (String arg : args) {
String[] keyValue = arg.split("=", 2);
List<String> keyValue = Splitter.on('=').limit(2).splitToList(arg);
if (keyValue.length == 2) {
String key = keyValue[0];
String value = keyValue[1];
key = key.substring(0, 2) + key.substring(2).replace('-', '_');
if (keyValue.size() == 2) {
String key = keyValue.get(0);
String value = keyValue.get(1);
List<String> oldValue = argsMap.get(key);
if (oldValue == null) {
oldValue = new ArrayList<>();
@ -114,13 +128,14 @@ public final class EchoTestServer {
return ImmutableMap.<String, List<String>>builder().putAll(argsMap).build();
}
/** Turn gRPC ports from a string list to an int list. */
/** Turn ports from a string list to an int list. */
@VisibleForTesting
static List<Integer> getGrpcPorts(Map<String, List<String>> args) {
List<String> grpcPorts = args.get("--grpc");
List<Integer> grpcPortsInt = new ArrayList<>(grpcPorts.size());
static Set<Integer> getPorts(Map<String, List<String>> args, String flagName) {
List<String> grpcPorts = args.get(flagName);
Set<Integer> grpcPortsInt = new HashSet<>(grpcPorts.size());
for (String port : grpcPorts) {
port = CharMatcher.is('\"').trimFrom(port);
grpcPortsInt.add(Integer.parseInt(port));
}
return grpcPortsInt;
@ -141,11 +156,44 @@ public final class EchoTestServer {
*/
public static void main(String[] args) throws Exception {
Map<String, List<String>> processedArgs = preprocessArgs(args);
List<Integer> grpcPorts = getGrpcPorts(processedArgs);
Set<Integer> grpcPorts = getPorts(processedArgs, "--grpc");
Set<Integer> xdsPorts = getPorts(processedArgs, "--xds-grpc-server");
// If an xds port does not exist in gRPC ports set, add it.
grpcPorts.addAll(xdsPorts);
// which ports are supposed to use tls
Set<Integer> tlsPorts = getPorts(processedArgs, "--tls");
List<String> forwardingAddress = processedArgs.get("--forwarding-address");
if (forwardingAddress.size() > 1) {
logger.severe("More than one value for --forwarding-address not allowed");
System.exit(1);
}
if (forwardingAddress.size() == 0) {
forwardingAddress.add("0.0.0.0:7072");
}
List<String> key = processedArgs.get("key");
List<String> crt = processedArgs.get("crt");
if (key.size() > 1 || crt.size() > 1) {
logger.severe("More than one value for --key or --crt not allowed");
System.exit(1);
}
if (key.size() != crt.size()) {
logger.severe("Both --key or --crt should be present or absent");
System.exit(1);
}
ServerCredentials tlsServerCredentials = null;
if (key.size() == 1) {
tlsServerCredentials = TlsServerCredentials.create(new File(crt.get(0)),
new File(key.get(0)));
} else if (!tlsPorts.isEmpty()) {
logger.severe("Both --key or --crt should be present if tls ports used");
System.exit(1);
}
String hostname = determineHostname();
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(grpcPorts, hostname);
echoTestServer.runServers(hostname, grpcPorts, xdsPorts, tlsPorts, forwardingAddress.get(0),
tlsServerCredentials);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("Shutting down");
@ -158,20 +206,39 @@ public final class EchoTestServer {
echoTestServer.blockUntilShutdown();
}
void runServers(List<Integer> grpcPorts, String hostname) throws IOException {
void runServers(String hostname, Collection<Integer> grpcPorts, Collection<Integer> xdsPorts,
Collection<Integer> tlsPorts, String forwardingAddress,
ServerCredentials tlsServerCredentials)
throws IOException {
ServerServiceDefinition service = ServerInterceptors.intercept(
new EchoTestServiceImpl(hostname), new EchoTestServerInterceptor());
new EchoTestServiceImpl(hostname, forwardingAddress), new EchoTestServerInterceptor());
servers = new ArrayList<>(grpcPorts.size() + 1);
boolean runAdminServices = Boolean.getBoolean("EXPOSE_GRPC_ADMIN");
for (int port : grpcPorts) {
runServer(port, service);
ServerCredentials serverCredentials = InsecureServerCredentials.create();
String credTypeString = "over plaintext";
if (xdsPorts.contains(port)) {
serverCredentials = XdsServerCredentials.create(InsecureServerCredentials.create());
credTypeString = "over xDS-configured mTLS";
} else if (tlsPorts.contains(port)) {
serverCredentials = tlsServerCredentials;
credTypeString = "over TLS";
}
servers.add(runServer(port, service, serverCredentials, credTypeString, runAdminServices));
}
}
void runServer(int port, ServerServiceDefinition service) throws IOException {
logger.log(Level.INFO, "Listening GRPC on " + port);
servers.add(Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(service)
.build().start());
static Server runServer(
int port, ServerServiceDefinition service, ServerCredentials serverCredentials,
String credTypeString, boolean runAdminServices)
throws IOException {
logger.log(Level.INFO, "Listening GRPC ({0}) on {1}", new Object[]{credTypeString, port});
ServerBuilder<?> builder = Grpc.newServerBuilderForPort(port, serverCredentials)
.addService(service);
if (runAdminServices) {
builder = builder.addServices(AdminInterface.getStandardServices());
}
return builder.build().start();
}
void stopServers() {
@ -228,9 +295,14 @@ public final class EchoTestServer {
private static class EchoTestServiceImpl extends EchoTestServiceImplBase {
private final String hostname;
private final String forwardingAddress;
private final EchoTestServiceGrpc.EchoTestServiceBlockingStub forwardingStub;
EchoTestServiceImpl(String hostname) {
EchoTestServiceImpl(String hostname, String forwardingAddress) {
this.hostname = hostname;
this.forwardingAddress = forwardingAddress;
this.forwardingStub = EchoTestServiceGrpc.newBlockingStub(
Grpc.newChannelBuilder(forwardingAddress, InsecureChannelCredentials.create()).build());
}
@Override
@ -272,20 +344,39 @@ public final class EchoTestServer {
}
}
private static final class EchoCall {
EchoResponse response;
Status status;
}
private ForwardEchoResponse buildEchoResponse(ForwardEchoRequest request)
throws InterruptedException {
ForwardEchoResponse.Builder forwardEchoResponseBuilder
= ForwardEchoResponse.newBuilder();
String rawUrl = request.getUrl();
if (!rawUrl.startsWith(GRPC_SCHEME)) {
List<String> urlParts = Splitter.on(':').limit(2).splitToList(rawUrl);
if (urlParts.size() < 2) {
throw new StatusRuntimeException(
Status.UNIMPLEMENTED.withDescription("protocol grpc:// required"));
Status.INVALID_ARGUMENT.withDescription("No protocol configured for url " + rawUrl));
}
ChannelCredentials creds;
String target = null;
if ("grpc".equals(urlParts.get(0))) {
// We don't really want to test this but the istio test infrastructure needs
// this to be supported. If we ever decide to add support for this properly,
// we would need to add support for TLS creds here.
creds = InsecureChannelCredentials.create();
target = urlParts.get(1).substring(2);
} else if ("xds".equals(urlParts.get(0))) {
creds = XdsChannelCredentials.create(InsecureChannelCredentials.create());
target = rawUrl;
} else {
logger.log(Level.INFO, "Protocol {0} not supported. Forwarding to {1}",
new String[]{urlParts.get(0), forwardingAddress});
return forwardingStub.withDeadline(Context.current().getDeadline()).forwardEcho(request);
}
rawUrl = rawUrl.substring(GRPC_SCHEME.length());
// May need to use xds security if urlScheme is "xds"
ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilder(
rawUrl, InsecureChannelCredentials.create());
ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilder(target, creds);
ManagedChannel channel = channelBuilder.build();
List<Header> requestHeaders = request.getHeadersList();
@ -297,6 +388,7 @@ public final class EchoTestServer {
}
int count = request.getCount() == 0 ? 1 : request.getCount();
// Calculate the amount of time to sleep after each call.
Duration durationPerQuery = Duration.ZERO;
if (request.getQps() > 0) {
durationPerQuery = Duration.ofNanos(
@ -310,17 +402,20 @@ public final class EchoTestServer {
Instant start = Instant.now();
logger.info("starting instant=" + start);
Duration expected = Duration.ZERO;
final CountDownLatch latch = new CountDownLatch(count);
EchoCall[] echoCalls = new EchoCall[count];
for (int i = 0; i < count; i++) {
Metadata currentMetadata = new Metadata();
currentMetadata.merge(metadata);
currentMetadata.put(
Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER), "" + i);
EchoTestServiceBlockingStub stub
= EchoTestServiceGrpc.newBlockingStub(channel).withInterceptors(
EchoTestServiceGrpc.EchoTestServiceFutureStub stub
= EchoTestServiceGrpc.newFutureStub(channel).withInterceptors(
MetadataUtils.newAttachHeadersInterceptor(currentMetadata))
.withDeadlineAfter(request.getTimeoutMicros(), TimeUnit.MICROSECONDS);
String response = callEcho(stub, echoRequest, i);
forwardEchoResponseBuilder.addOutput(response);
echoCalls[i] = new EchoCall();
callEcho(stub, echoRequest, echoCalls[i], latch);
Instant current = Instant.now();
logger.info("after rpc instant=" + current);
Duration elapsed = Duration.between(start, current);
@ -332,18 +427,57 @@ public final class EchoTestServer {
Thread.sleep(timeLeft.toMillis());
}
}
latch.await();
for (int i = 0; i < count; i++) {
if (Status.OK.equals(echoCalls[i].status)) {
forwardEchoResponseBuilder.addOutput(
buildForwardEchoStruct(i, echoCalls, request.getMessage()));
} else {
logger.log(Level.SEVERE, "RPC {0} failed {1}: {2}",
new Object[]{i, echoCalls[i].status.getCode(), echoCalls[i].status.getDescription()});
forwardEchoResponseBuilder.clear();
throw echoCalls[i].status.asRuntimeException();
}
}
return forwardEchoResponseBuilder.build();
}
private String callEcho(EchoTestServiceBlockingStub stub,
EchoRequest echoRequest, int count) {
try {
EchoResponse echoResponse = stub.echo(echoRequest);
return echoResponse.getMessage();
} catch (Exception e) {
logger.log(Level.INFO, "RPC failed " + count, e);
private static String buildForwardEchoStruct(int i, EchoCall[] echoCalls,
String requestMessage) {
// The test infrastructure might expect the entire struct instead of
// just the message.
StringBuilder sb = new StringBuilder();
sb.append(String.format("[%d] grpcecho.Echo(%s)\n", i, requestMessage));
Iterable<String> iterable = Splitter.on('\n').split(echoCalls[i].response.getMessage());
for (String line : iterable) {
if (!line.isEmpty()) {
sb.append(String.format("[%d body] %s\n", i, line));
}
}
return "";
return sb.toString();
}
private void callEcho(EchoTestServiceFutureStub stub,
EchoRequest echoRequest, final EchoCall echoCall, CountDownLatch latch) {
ListenableFuture<EchoResponse> response = stub.echo(echoRequest);
Futures.addCallback(
response,
new FutureCallback<EchoResponse>() {
@Override
public void onSuccess(@Nullable EchoResponse result) {
echoCall.response = result;
echoCall.status = Status.OK;
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
echoCall.status = Status.fromThrowable(t);
latch.countDown();
}
},
MoreExecutors.directExecutor());
}
}

View File

@ -19,27 +19,37 @@ package io.grpc.testing.istio;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.istio.test.Echo.EchoRequest;
import io.istio.test.Echo.EchoResponse;
import io.istio.test.Echo.ForwardEchoRequest;
import io.istio.test.Echo.ForwardEchoResponse;
import io.istio.test.Echo.Header;
import io.istio.test.EchoTestServiceGrpc;
import io.istio.test.EchoTestServiceGrpc.EchoTestServiceImplBase;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -50,6 +60,30 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class EchoTestServerTest {
private static final String[] EXPECTED_KEY_SET = {
"--server_first", "--forwarding-address",
"--bind_ip", "--istio-version", "--bind_localhost", "--grpc", "--tls",
"--cluster", "--key", "--tcp", "--crt", "--metrics", "--port", "--version"
};
private static final String TEST_ARGS =
"--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=\"17070\" --port=\"18085\""
+ " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=\"16060\" --server_first=16060"
+ " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\""
+ " --grpc=\"17071\" --port=\"19443\" --tls=19443 --port=\"18082\" --bind_ip=18082"
+ " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\""
+ " --port=\"8080\" --port=\"3333\" --version=\"v1\" --istio-version=3 --crt=/cert.crt"
+ " --key=/cert.key --forwarding-address=192.168.1.10:7072";
private static final String TEST_ARGS_PORTS =
"--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=17070 --port=18085"
+ " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=16060 --server_first=16060"
+ " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\""
+ " --grpc=\"17071\" --port=\"19443\" --tls=\"19443\" --port=\"18082\" --bind_ip=18082"
+ " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\""
+ " --port=\"8080\" --port=3333 --version=\"v1\" --istio-version=3 --crt=/cert.crt"
+ " --key=/cert.key --xds-grpc-server=12034 --xds-grpc-server=\"34012\"";
@Test
public void preprocessArgsTest() {
String[] splitArgs = TEST_ARGS.split(" ");
@ -59,16 +93,16 @@ public class EchoTestServerTest {
assertEquals(processedArgs.get("--server_first"), ImmutableList.of("16060", "16061"));
assertEquals(processedArgs.get("--bind_ip"), ImmutableList.of("18082"));
assertEquals(processedArgs.get("--bind_localhost"), ImmutableList.of("18084"));
assertEquals(processedArgs.get("--version"), ImmutableList.of("\"v1\""));
assertEquals(processedArgs.get("--grpc"), ImmutableList.of("\"17070\"", "\"17071\""));
assertEquals(processedArgs.get("--tls"), ImmutableList.of("18443", "19443"));
assertEquals(processedArgs.get("--cluster"), ImmutableList.of("\"cluster-0\""));
assertEquals(processedArgs.get("--key"), ImmutableList.of("/cert.key"));
assertEquals(processedArgs.get("--tcp"), ImmutableList.of("\"19090\"", "\"16060\"",
"\"19091\"","\"16061\"","\"19092\""));
assertEquals(processedArgs.get("--istio_version"), ImmutableList.of("3"));
assertEquals(processedArgs.get("--istio-version"), ImmutableList.of("3"));
assertEquals(processedArgs.get("--crt"), ImmutableList.of("/cert.crt"));
assertEquals(processedArgs.get("--metrics"), ImmutableList.of("15014"));
assertEquals(ImmutableList.of("192.168.1.10:7072"), processedArgs.get("--forwarding-address"));
assertEquals(
processedArgs.get("--port"),
ImmutableList.of(
@ -84,11 +118,34 @@ public class EchoTestServerTest {
"\"3333\""));
}
@Test
public void preprocessArgsPortsTest() {
String[] splitArgs = TEST_ARGS_PORTS.split(" ");
Map<String, List<String>> processedArgs = EchoTestServer.preprocessArgs(splitArgs);
Set<Integer> ports = EchoTestServer.getPorts(processedArgs, "--port");
assertThat(ports).containsExactly(18080, 8080, 18081, 18082, 19443, 18083, 18084, 18085,
3333, 18443);
ports = EchoTestServer.getPorts(processedArgs, "--grpc");
assertThat(ports).containsExactly(17070, 17071);
ports = EchoTestServer.getPorts(processedArgs, "--tls");
assertThat(ports).containsExactly(18443, 19443);
ports = EchoTestServer.getPorts(processedArgs, "--xds-grpc-server");
assertThat(ports).containsExactly(34012, 12034);
}
@Test
public void echoTest() throws IOException, InterruptedException {
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(ImmutableList.of(0, 0), "test-host");
echoTestServer.runServers(
"test-host",
ImmutableList.of(0, 0),
ImmutableList.of(),
ImmutableList.of(),
"0.0.0.0:7072",
null);
assertEquals(2, echoTestServer.servers.size());
int port = echoTestServer.servers.get(0).getPort();
assertNotEquals(0, port);
@ -127,10 +184,16 @@ public class EchoTestServerTest {
static final int COUNT_OF_REQUESTS_TO_FORWARD = 60;
@Test
public void forwardEchoTest() throws IOException {
public void forwardEchoTest() throws IOException, InterruptedException {
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(ImmutableList.of(0, 0), "test-host");
echoTestServer.runServers(
"test-host",
ImmutableList.of(0, 0),
ImmutableList.of(),
ImmutableList.of(),
"0.0.0.0:7072",
null);
assertEquals(2, echoTestServer.servers.size());
int port1 = echoTestServer.servers.get(0).getPort();
int port2 = echoTestServer.servers.get(1).getPort();
@ -165,29 +228,161 @@ public class EchoTestServerTest {
}
long duration = Duration.between(start, end).toMillis();
assertThat(duration).isAtLeast(COUNT_OF_REQUESTS_TO_FORWARD * 10L);
echoTestServer.stopServers();
echoTestServer.blockUntilShutdown();
}
private static void validateOutput(String output, int i) {
assertThat(output).contains("RequestHeader=x-request-id:" + i);
assertThat(output).contains("RequestHeader=test-key1:test-value1");
assertThat(output).contains("RequestHeader=test-key2:test-value2");
assertThat(output).contains("Hostname=test-host");
assertThat(output).contains("StatusCode=200");
assertThat(output).contains("Echo=forward-echo-test-message");
List<String> content = Splitter.on('\n').splitToList(output);
assertThat(content.size()).isAtLeast(7); // see echo implementation
assertThat(content.get(0))
.isEqualTo(String.format("[%d] grpcecho.Echo(forward-echo-test-message)", i));
String prefix = "[" + i + " body] ";
assertThat(content).contains(prefix + "RequestHeader=x-request-id:" + i);
assertThat(content).contains(prefix + "RequestHeader=test-key1:test-value1");
assertThat(content).contains(prefix + "RequestHeader=test-key2:test-value2");
assertThat(content).contains(prefix + "Hostname=test-host");
assertThat(content).contains(prefix + "StatusCode=200");
}
private static final String[] EXPECTED_KEY_SET = {
"--server_first",
"--bind_ip", "--istio_version", "--bind_localhost", "--version", "--grpc", "--tls",
"--cluster", "--key", "--tcp", "--crt", "--metrics", "--port"
};
@Test
public void nonGrpcForwardEchoTest() throws IOException, InterruptedException {
ForwardServiceForNonGrpcImpl forwardServiceForNonGrpc = new ForwardServiceForNonGrpcImpl();
forwardServiceForNonGrpc.receivedRequests = new ArrayList<>();
forwardServiceForNonGrpc.responsesToReturn = new ArrayList<>();
Server nonGrpcEchoServer =
EchoTestServer.runServer(
0, forwardServiceForNonGrpc.bindService(), InsecureServerCredentials.create(),
"", false);
int nonGrpcEchoServerPort = nonGrpcEchoServer.getPort();
private static final String TEST_ARGS =
"--metrics=15014 --cluster=\"cluster-0\" --port=\"18080\" --grpc=\"17070\" --port=\"18085\""
+ " --tcp=\"19090\" --port=\"18443\" --tls=18443 --tcp=\"16060\" --server_first=16060"
+ " --tcp=\"19091\" --tcp=\"16061\" --server_first=16061 --port=\"18081\""
+ " --grpc=\"17071\" --port=\"19443\" --tls=19443 --port=\"18082\" --bind_ip=18082"
+ " --port=\"18084\" --bind_localhost=18084 --tcp=\"19092\" --port=\"18083\""
+ " --port=\"8080\" --port=\"3333\" --version=\"v1\" --istio-version=3 --crt=/cert.crt"
+ " --key=/cert.key";
EchoTestServer echoTestServer = new EchoTestServer();
echoTestServer.runServers(
"test-host",
ImmutableList.of(0),
ImmutableList.of(),
ImmutableList.of(),
"0.0.0.0:" + nonGrpcEchoServerPort,
null);
assertEquals(1, echoTestServer.servers.size());
int port1 = echoTestServer.servers.get(0).getPort();
ManagedChannelBuilder<?> channelBuilder =
Grpc.newChannelBuilderForAddress("localhost", port1, InsecureChannelCredentials.create());
ManagedChannel channel = channelBuilder.build();
EchoTestServiceGrpc.EchoTestServiceBlockingStub stub =
EchoTestServiceGrpc.newBlockingStub(channel);
forwardServiceForNonGrpc.responsesToReturn.add(
ForwardEchoResponse.newBuilder().addOutput("line 1").addOutput("line 2").build());
ForwardEchoRequest forwardEchoRequest =
ForwardEchoRequest.newBuilder()
.setCount(COUNT_OF_REQUESTS_TO_FORWARD)
.setQps(100)
.setTimeoutMicros(2000_000L) // 2000 millis
.setUrl("http://www.example.com") // non grpc protocol
.addHeaders(
Header.newBuilder().setKey("test-key1").setValue("test-value1").build())
.addHeaders(
Header.newBuilder().setKey("test-key2").setValue("test-value2").build())
.setMessage("non-grpc-forward-echo-test-message1")
.build();
ForwardEchoResponse forwardEchoResponse = stub.forwardEcho(forwardEchoRequest);
List<String> outputs = forwardEchoResponse.getOutputList();
assertEquals(2, outputs.size());
assertThat(outputs.get(0)).isEqualTo("line 1");
assertThat(outputs.get(1)).isEqualTo("line 2");
assertThat(forwardServiceForNonGrpc.receivedRequests).hasSize(1);
ForwardEchoRequest receivedRequest = forwardServiceForNonGrpc.receivedRequests.remove(0);
assertThat(receivedRequest.getUrl()).isEqualTo("http://www.example.com");
assertThat(receivedRequest.getMessage()).isEqualTo("non-grpc-forward-echo-test-message1");
assertThat(receivedRequest.getCount()).isEqualTo(COUNT_OF_REQUESTS_TO_FORWARD);
assertThat(receivedRequest.getQps()).isEqualTo(100);
forwardServiceForNonGrpc.responsesToReturn.add(
Status.UNIMPLEMENTED.asRuntimeException());
forwardEchoRequest =
ForwardEchoRequest.newBuilder()
.setCount(1)
.setQps(100)
.setTimeoutMicros(2000_000L) // 2000 millis
.setUrl("redis://192.168.1.1") // non grpc protocol
.addHeaders(
Header.newBuilder().setKey("test-key1").setValue("test-value1").build())
.setMessage("non-grpc-forward-echo-test-message2")
.build();
try {
ForwardEchoResponse unused = stub.forwardEcho(forwardEchoRequest);
fail("exception expected");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus()).isEqualTo(Status.UNIMPLEMENTED);
}
assertThat(forwardServiceForNonGrpc.receivedRequests).hasSize(1);
receivedRequest = forwardServiceForNonGrpc.receivedRequests.remove(0);
assertThat(receivedRequest.getUrl()).isEqualTo("redis://192.168.1.1");
assertThat(receivedRequest.getMessage()).isEqualTo("non-grpc-forward-echo-test-message2");
assertThat(receivedRequest.getCount()).isEqualTo(1);
forwardServiceForNonGrpc.responsesToReturn.add(
ForwardEchoResponse.newBuilder().addOutput("line 3").build());
forwardEchoRequest =
ForwardEchoRequest.newBuilder()
.setCount(1)
.setQps(100)
.setTimeoutMicros(2000_000L) // 2000 millis
.setUrl("http2://192.168.1.1") // non grpc protocol
.addHeaders(
Header.newBuilder().setKey("test-key3").setValue("test-value3").build())
.setMessage("non-grpc-forward-echo-test-message3")
.build();
forwardEchoResponse = stub.forwardEcho(forwardEchoRequest);
outputs = forwardEchoResponse.getOutputList();
assertEquals(1, outputs.size());
assertThat(outputs.get(0)).isEqualTo("line 3");
assertThat(forwardServiceForNonGrpc.receivedRequests).hasSize(1);
receivedRequest = forwardServiceForNonGrpc.receivedRequests.remove(0);
assertThat(receivedRequest.getUrl()).isEqualTo("http2://192.168.1.1");
assertThat(receivedRequest.getMessage()).isEqualTo("non-grpc-forward-echo-test-message3");
List<Header> headers = receivedRequest.getHeadersList();
assertThat(headers).hasSize(1);
assertThat(headers.get(0).getKey()).isEqualTo("test-key3");
assertThat(headers.get(0).getValue()).isEqualTo("test-value3");
echoTestServer.stopServers();
echoTestServer.blockUntilShutdown();
nonGrpcEchoServer.shutdown();
nonGrpcEchoServer.awaitTermination(5, TimeUnit.SECONDS);
}
/**
* Emulate the Go Echo server that receives the non-grpc protocol requests.
*/
private static class ForwardServiceForNonGrpcImpl extends EchoTestServiceImplBase {
List<ForwardEchoRequest> receivedRequests;
List<Object> responsesToReturn;
@Override
public void forwardEcho(ForwardEchoRequest request,
StreamObserver<ForwardEchoResponse> responseObserver) {
receivedRequests.add(request);
Object response = responsesToReturn.remove(0);
if (response instanceof Throwable) {
responseObserver.onError((Throwable) response);
} else if (response instanceof ForwardEchoResponse) {
responseObserver.onNext((ForwardEchoResponse) response);
responseObserver.onCompleted();
}
responseObserver.onError(new IllegalArgumentException("Unknown type in responsesToReturn"));
}
}
}