Setting GRPC as default protocol for non-actor APIs. (#239)

* Setting GRPC as default protocol for non-actor APIs.

* Fixes issues in GRPC client + ITs for both GRPC and HTTP.

* Update MethodInvokeIT.java
This commit is contained in:
Artur Souza 2020-02-18 10:55:14 -08:00 committed by GitHub
parent d60df830aa
commit 85e9d2de02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 180 additions and 21 deletions

View File

@ -7,12 +7,16 @@ package io.dapr.it;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.junit.AfterClass; import org.junit.AfterClass;
public abstract class BaseIT { public abstract class BaseIT {
protected static final String STATE_STORE_NAME = "statestore"; protected static final String STATE_STORE_NAME = "statestore";
private static final Map<String, DaprRun.Builder> DAPR_RUN_BUILDERS = new HashMap<>();
private static final Collection<DaprRun> DAPR_RUNS = new ArrayList<>(); private static final Collection<DaprRun> DAPR_RUNS = new ArrayList<>();
protected static DaprRun startDaprApp( protected static DaprRun startDaprApp(
@ -31,18 +35,30 @@ public abstract class BaseIT {
Boolean useAppPort, Boolean useAppPort,
Boolean useDaprPorts, Boolean useDaprPorts,
int maxWaitMilliseconds) throws Exception { int maxWaitMilliseconds) throws Exception {
DaprRun run = new DaprRun( DaprRun.Builder builder = new DaprRun.Builder(
testName, testName,
DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts),
successMessage, successMessage,
serviceClass, serviceClass,
maxWaitMilliseconds); maxWaitMilliseconds);
DaprRun run = builder.build();
DAPR_RUNS.add(run); DAPR_RUNS.add(run);
DAPR_RUN_BUILDERS.put(run.getAppName(), builder);
run.start(); run.start();
run.use(); run.use();
return run; return run;
} }
protected static DaprRun restartDaprApp(DaprRun run) throws Exception {
DaprRun.Builder builder = DAPR_RUN_BUILDERS.get(run.getAppName());
run.stop();
DaprRun newRun = builder.build();
DAPR_RUNS.add(newRun);
newRun.start();
newRun.use();
return newRun;
}
@AfterClass @AfterClass
public static void cleanUp() throws Exception { public static void cleanUp() throws Exception {
for (DaprRun app : DAPR_RUNS) { for (DaprRun app : DAPR_RUNS) {

View File

@ -24,12 +24,16 @@ public class DaprPorts {
this.appPort = appPort; this.appPort = appPort;
} }
public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPort) throws IOException { public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPort) {
List<Integer> freePorts = findFreePorts(3); try {
return new DaprPorts( List<Integer> freePorts = findFreePorts(3);
appPort ? freePorts.get(0) : null, return new DaprPorts(
httpPort ? freePorts.get(1) : null, appPort ? freePorts.get(0) : null,
grpcPort ? freePorts.get(2) : null); httpPort ? freePorts.get(1) : null,
grpcPort ? freePorts.get(2) : null);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
public static DaprPorts build() throws IOException { public static DaprPorts build() throws IOException {

View File

@ -7,7 +7,7 @@ package io.dapr.it;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier;
import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.Retry.callWithRetry;
@ -32,12 +32,15 @@ public class DaprRun {
private final Command stopCommand; private final Command stopCommand;
DaprRun( private DaprRun(String testName,
String testName, DaprPorts ports, String successMessage, Class serviceClass, int maxWaitMilliseconds) { DaprPorts ports,
String successMessage,
Class serviceClass,
int maxWaitMilliseconds) {
// The app name needs to be deterministic since we depend on it to kill previous runs. // The app name needs to be deterministic since we depend on it to kill previous runs.
this.appName = String.format("%s_%s", testName, serviceClass.getSimpleName()); this.appName = String.format("%s_%s", testName, serviceClass.getSimpleName());
this.startCommand = this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports)); new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports));
this.stopCommand = new Command( this.stopCommand = new Command(
"app stopped successfully", "app stopped successfully",
"dapr stop --app-id " + this.appName); "dapr stop --app-id " + this.appName);
@ -98,13 +101,17 @@ public class DaprRun {
if (this.ports.getGrpcPort() != null) { if (this.ports.getGrpcPort() != null) {
System.getProperties().setProperty("dapr.grpc.port", String.valueOf(this.ports.getGrpcPort())); System.getProperties().setProperty("dapr.grpc.port", String.valueOf(this.ports.getGrpcPort()));
} }
System.getProperties().setProperty("dapr.grpc.enabled", Boolean.FALSE.toString()); System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString());
} }
public void switchToGRPC() { public void switchToGRPC() {
System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString()); System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString());
} }
public void switchToHTTP() {
System.getProperties().setProperty("dapr.grpc.enabled", Boolean.FALSE.toString());
}
public int getGrpcPort() { public int getGrpcPort() {
return ports.getGrpcPort(); return ports.getGrpcPort();
} }
@ -134,7 +141,8 @@ public class DaprRun {
private static void assertListeningOnPort(int port) { private static void assertListeningOnPort(int port) {
System.out.printf("Checking port %d ...\n", port); System.out.printf("Checking port %d ...\n", port);
java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(io.dapr.utils.Constants.DEFAULT_HOSTNAME, port); java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(io.dapr.utils.Constants.DEFAULT_HOSTNAME,
port);
try (java.net.Socket socket = new java.net.Socket()) { try (java.net.Socket socket = new java.net.Socket()) {
socket.connect(socketAddress, 1000); socket.connect(socketAddress, 1000);
} catch (Exception e) { } catch (Exception e) {
@ -143,4 +151,39 @@ public class DaprRun {
System.out.printf("Confirmed listening on port %d.\n", port); System.out.printf("Confirmed listening on port %d.\n", port);
} }
static class Builder {
private final String testName;
private final Supplier<DaprPorts> portsSupplier;
private final String successMessage;
private final Class serviceClass;
private final int maxWaitMilliseconds;
Builder(
String testName,
Supplier<DaprPorts> portsSupplier,
String successMessage,
Class serviceClass,
int maxWaitMilliseconds) {
this.testName = testName;
this.portsSupplier = portsSupplier;
this.successMessage = successMessage;
this.serviceClass = serviceClass;
this.maxWaitMilliseconds = maxWaitMilliseconds;
}
DaprRun build() {
return new DaprRun(
this.testName,
this.portsSupplier.get(),
this.successMessage,
this.serviceClass,
this.maxWaitMilliseconds);
}
}
} }

View File

@ -13,12 +13,16 @@ import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import io.dapr.it.services.EmptyService; import io.dapr.it.services.EmptyService;
import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer;
import java.util.Arrays;
import java.util.Collection;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.util.Base64; import java.util.Base64;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.Retry.callWithRetry;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -27,6 +31,7 @@ import static org.junit.Assert.fail;
/** /**
* Service for input and output binding example. * Service for input and output binding example.
*/ */
@RunWith(Parameterized.class)
public class BindingIT extends BaseIT { public class BindingIT extends BaseIT {
public static class MyClass { public static class MyClass {
@ -36,11 +41,24 @@ public class BindingIT extends BaseIT {
public String message; public String message;
} }
/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}
@Parameterized.Parameter
public boolean useGrpc;
@Test @Test
public void inputOutputBinding() throws Exception { public void inputOutputBinding() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir")); System.out.println("Working Directory = " + System.getProperty("user.dir"));
DaprRun daprRunInputBinding = startDaprApp( DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName(), this.getClass().getSimpleName(),
InputBindingService.SUCCESS_MESSAGE, InputBindingService.SUCCESS_MESSAGE,
InputBindingService.class, InputBindingService.class,
@ -49,6 +67,11 @@ public class BindingIT extends BaseIT {
// At this point, it is guaranteed that the service above is running and all ports being listened to. // At this point, it is guaranteed that the service above is running and all ports being listened to.
// TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet? // TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet?
Thread.sleep(120000); Thread.sleep(120000);
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
DaprClient client = new DaprClientBuilder().build(); DaprClient client = new DaprClientBuilder().build();
@ -74,7 +97,7 @@ public class BindingIT extends BaseIT {
final List<String> messages = final List<String> messages =
client.invokeService( client.invokeService(
Verb.GET, Verb.GET,
daprRunInputBinding.getAppName(), daprRun.getAppName(),
"messages", "messages",
null, null,
List.class).block(); List.class).block();

View File

@ -5,21 +5,49 @@ import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Verb; import io.dapr.client.domain.Verb;
import io.dapr.it.BaseIT; import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.*; import java.util.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.runners.Parameterized.*;
@RunWith(Parameterized.class)
public class MethodInvokeIT extends BaseIT { public class MethodInvokeIT extends BaseIT {
//Number of messages to be sent: 10 //Number of messages to be sent: 10
private static final int NUM_MESSAGES = 10; private static final int NUM_MESSAGES = 10;
private static DaprRun daprRun=null;
/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}
/**
* Run of a Dapr application.
*/
private static DaprRun daprRun = null;
/**
* Flag to determine if there is a context change based on parameters.
*/
private static Boolean wasGrpc;
@Parameter
public boolean useGrpc;
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void initClass() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir")); System.out.println("Working Directory = " + System.getProperty("user.dir"));
daprRun = startDaprApp( daprRun = startDaprApp(
@ -30,6 +58,24 @@ public class MethodInvokeIT extends BaseIT {
60000); 60000);
} }
@Before
public void init() throws Exception {
if (wasGrpc != null) {
if (wasGrpc.booleanValue() != this.useGrpc) {
// Context change.
daprRun = super.restartDaprApp(daprRun);
}
}
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
wasGrpc = this.useGrpc;
}
@Test @Test
public void testInvoke() { public void testInvoke() {

View File

@ -10,15 +10,20 @@ import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Verb; import io.dapr.client.domain.Verb;
import io.dapr.it.BaseIT; import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import java.util.Arrays;
import java.util.Collection;
import org.junit.Test; import org.junit.Test;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.Retry.callWithRetry;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class PubSubIT extends BaseIT { public class PubSubIT extends BaseIT {
//Number of messages to be sent: 10 //Number of messages to be sent: 10
@ -27,6 +32,19 @@ public class PubSubIT extends BaseIT {
//The title of the topic to be used for publishing //The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic"; private static final String TOPIC_NAME = "testingtopic";
/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}
@Parameterized.Parameter
public boolean useGrpc;
@Test @Test
public void testPubSub() throws Exception { public void testPubSub() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir")); System.out.println("Working Directory = " + System.getProperty("user.dir"));
@ -38,6 +56,11 @@ public class PubSubIT extends BaseIT {
true, true,
60000); 60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to. // At this point, it is guaranteed that the service above is running and all ports being listened to.
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
DaprClient client = new DaprClientBuilder().build(); DaprClient client = new DaprClientBuilder().build();
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {

View File

@ -37,6 +37,7 @@ public class HttpStateClientIT extends BaseIT {
false, false,
1000 1000
); );
daprRun.switchToHTTP();
} }
@Test @Test

View File

@ -193,7 +193,7 @@ public class DaprClientGrpc implements DaprClient {
builder.setData(data); builder.setData(data);
} }
if (metadata != null) { if (metadata != null) {
builder.getMetadataMap().putAll(metadata); builder.putAllMetadata(metadata);
} }
DaprProtos.InvokeBindingEnvelope envelope = builder.build(); DaprProtos.InvokeBindingEnvelope envelope = builder.build();
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
@ -461,11 +461,14 @@ public class DaprClientGrpc implements DaprClient {
String verb, String appId, String method, K request) throws IOException { String verb, String appId, String method, K request) throws IOException {
DaprProtos.InvokeServiceEnvelope.Builder envelopeBuilder = DaprProtos.InvokeServiceEnvelope.newBuilder() DaprProtos.InvokeServiceEnvelope.Builder envelopeBuilder = DaprProtos.InvokeServiceEnvelope.newBuilder()
.setId(appId) .setId(appId)
.setMethod(verb); .setMethod(method)
.putMetadata("http.verb", verb);
if (request != null) { if (request != null) {
byte[] byteRequest = objectSerializer.serialize(request); byte[] byteRequest = objectSerializer.serialize(request);
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build(); Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
envelopeBuilder.setData(data); envelopeBuilder.setData(data);
} else {
envelopeBuilder.setData(Any.newBuilder().build());
} }
return envelopeBuilder.build(); return envelopeBuilder.build();
} }

View File

@ -28,7 +28,7 @@ public class Properties {
/** /**
* Dapr's default GRPC port. * Dapr's default GRPC port.
*/ */
private static final Boolean DEFAULT_GRPC_ENABLED = false; private static final Boolean DEFAULT_GRPC_ENABLED = true;
/** /**
* Dapr's default String encoding: UTF-8. * Dapr's default String encoding: UTF-8.