diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index b9c05ae56..7765166c4 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -7,12 +7,16 @@ package io.dapr.it; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.junit.AfterClass; public abstract class BaseIT { protected static final String STATE_STORE_NAME = "statestore"; + private static final Map DAPR_RUN_BUILDERS = new HashMap<>(); + private static final Collection DAPR_RUNS = new ArrayList<>(); protected static DaprRun startDaprApp( @@ -31,18 +35,30 @@ public abstract class BaseIT { Boolean useAppPort, Boolean useDaprPorts, int maxWaitMilliseconds) throws Exception { - DaprRun run = new DaprRun( + DaprRun.Builder builder = new DaprRun.Builder( testName, - DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), + () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), successMessage, serviceClass, maxWaitMilliseconds); + DaprRun run = builder.build(); DAPR_RUNS.add(run); + DAPR_RUN_BUILDERS.put(run.getAppName(), builder); run.start(); run.use(); return run; } + protected static DaprRun restartDaprApp(DaprRun run) throws Exception { + DaprRun.Builder builder = DAPR_RUN_BUILDERS.get(run.getAppName()); + run.stop(); + DaprRun newRun = builder.build(); + DAPR_RUNS.add(newRun); + newRun.start(); + newRun.use(); + return newRun; + } + @AfterClass public static void cleanUp() throws Exception { for (DaprRun app : DAPR_RUNS) { diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java index 7d6f04e2b..1de7048d2 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java @@ -24,12 +24,16 @@ public class DaprPorts { this.appPort = appPort; } - public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPort) throws IOException { - List freePorts = findFreePorts(3); - return new DaprPorts( - appPort ? freePorts.get(0) : null, - httpPort ? freePorts.get(1) : null, - grpcPort ? freePorts.get(2) : null); + public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPort) { + try { + List freePorts = findFreePorts(3); + return new DaprPorts( + appPort ? freePorts.get(0) : null, + httpPort ? freePorts.get(1) : null, + grpcPort ? freePorts.get(2) : null); + } catch (IOException e) { + throw new RuntimeException(e); + } } public static DaprPorts build() throws IOException { diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 57f5884ec..27895a846 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -7,7 +7,7 @@ package io.dapr.it; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static io.dapr.it.Retry.callWithRetry; @@ -32,12 +32,15 @@ public class DaprRun { private final Command stopCommand; - DaprRun( - String testName, DaprPorts ports, String successMessage, Class serviceClass, int maxWaitMilliseconds) { + private DaprRun(String testName, + DaprPorts ports, + String successMessage, + Class serviceClass, + int maxWaitMilliseconds) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = String.format("%s_%s", testName, serviceClass.getSimpleName()); this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports)); + new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports)); this.stopCommand = new Command( "app stopped successfully", "dapr stop --app-id " + this.appName); @@ -98,13 +101,17 @@ public class DaprRun { if (this.ports.getGrpcPort() != null) { System.getProperties().setProperty("dapr.grpc.port", String.valueOf(this.ports.getGrpcPort())); } - System.getProperties().setProperty("dapr.grpc.enabled", Boolean.FALSE.toString()); + System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString()); } public void switchToGRPC() { System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString()); } + public void switchToHTTP() { + System.getProperties().setProperty("dapr.grpc.enabled", Boolean.FALSE.toString()); + } + public int getGrpcPort() { return ports.getGrpcPort(); } @@ -134,7 +141,8 @@ public class DaprRun { private static void assertListeningOnPort(int port) { System.out.printf("Checking port %d ...\n", port); - java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(io.dapr.utils.Constants.DEFAULT_HOSTNAME, port); + java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(io.dapr.utils.Constants.DEFAULT_HOSTNAME, + port); try (java.net.Socket socket = new java.net.Socket()) { socket.connect(socketAddress, 1000); } catch (Exception e) { @@ -143,4 +151,39 @@ public class DaprRun { System.out.printf("Confirmed listening on port %d.\n", port); } + + static class Builder { + + private final String testName; + + private final Supplier portsSupplier; + + private final String successMessage; + + private final Class serviceClass; + + private final int maxWaitMilliseconds; + + Builder( + String testName, + Supplier portsSupplier, + String successMessage, + Class serviceClass, + int maxWaitMilliseconds) { + this.testName = testName; + this.portsSupplier = portsSupplier; + this.successMessage = successMessage; + this.serviceClass = serviceClass; + this.maxWaitMilliseconds = maxWaitMilliseconds; + } + + DaprRun build() { + return new DaprRun( + this.testName, + this.portsSupplier.get(), + this.successMessage, + this.serviceClass, + this.maxWaitMilliseconds); + } + } } diff --git a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java index 537c6ccfa..0d3a8f8f6 100644 --- a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java @@ -13,12 +13,16 @@ import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import io.dapr.it.services.EmptyService; import io.dapr.serializer.DefaultObjectSerializer; +import java.util.Arrays; +import java.util.Collection; import org.junit.Ignore; import org.junit.Test; import java.util.Base64; import java.util.Collections; import java.util.List; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static io.dapr.it.Retry.callWithRetry; import static org.junit.Assert.assertEquals; @@ -27,6 +31,7 @@ import static org.junit.Assert.fail; /** * Service for input and output binding example. */ +@RunWith(Parameterized.class) public class BindingIT extends BaseIT { public static class MyClass { @@ -36,11 +41,24 @@ public class BindingIT extends BaseIT { public String message; } + /** + * Parameters for this test. + * Param #1: useGrpc. + * @return Collection of parameter tuples. + */ + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { { false }, { true } }); + } + + @Parameterized.Parameter + public boolean useGrpc; + @Test public void inputOutputBinding() throws Exception { System.out.println("Working Directory = " + System.getProperty("user.dir")); - DaprRun daprRunInputBinding = startDaprApp( + DaprRun daprRun = startDaprApp( this.getClass().getSimpleName(), InputBindingService.SUCCESS_MESSAGE, InputBindingService.class, @@ -49,6 +67,11 @@ public class BindingIT extends BaseIT { // At this point, it is guaranteed that the service above is running and all ports being listened to. // TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet? Thread.sleep(120000); + if (this.useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } DaprClient client = new DaprClientBuilder().build(); @@ -74,7 +97,7 @@ public class BindingIT extends BaseIT { final List messages = client.invokeService( Verb.GET, - daprRunInputBinding.getAppName(), + daprRun.getAppName(), "messages", null, List.class).block(); diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java index 12638a8ac..038fd257b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java @@ -5,21 +5,49 @@ import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.Verb; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.io.IOException; import java.util.*; import static org.junit.Assert.assertEquals; +import static org.junit.runners.Parameterized.*; +@RunWith(Parameterized.class) public class MethodInvokeIT extends BaseIT { //Number of messages to be sent: 10 private static final int NUM_MESSAGES = 10; - private static DaprRun daprRun=null; + + /** + * Parameters for this test. + * Param #1: useGrpc. + * @return Collection of parameter tuples. + */ + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { { false }, { true } }); + } + + /** + * Run of a Dapr application. + */ + private static DaprRun daprRun = null; + + /** + * Flag to determine if there is a context change based on parameters. + */ + private static Boolean wasGrpc; + + @Parameter + public boolean useGrpc; @BeforeClass - public static void init() throws Exception { + public static void initClass() throws Exception { System.out.println("Working Directory = " + System.getProperty("user.dir")); daprRun = startDaprApp( @@ -30,6 +58,24 @@ public class MethodInvokeIT extends BaseIT { 60000); } + @Before + public void init() throws Exception { + if (wasGrpc != null) { + if (wasGrpc.booleanValue() != this.useGrpc) { + // Context change. + daprRun = super.restartDaprApp(daprRun); + } + } + + if (this.useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } + + wasGrpc = this.useGrpc; + } + @Test public void testInvoke() { diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 776850e21..bdeb442b6 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -10,15 +10,20 @@ import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.Verb; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; +import java.util.Arrays; +import java.util.Collection; import org.junit.Test; import java.util.Collections; import java.util.List; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static io.dapr.it.Retry.callWithRetry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PubSubIT extends BaseIT { //Number of messages to be sent: 10 @@ -27,6 +32,19 @@ public class PubSubIT extends BaseIT { //The title of the topic to be used for publishing private static final String TOPIC_NAME = "testingtopic"; + /** + * Parameters for this test. + * Param #1: useGrpc. + * @return Collection of parameter tuples. + */ + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { { false }, { true } }); + } + + @Parameterized.Parameter + public boolean useGrpc; + @Test public void testPubSub() throws Exception { System.out.println("Working Directory = " + System.getProperty("user.dir")); @@ -38,6 +56,11 @@ public class PubSubIT extends BaseIT { true, 60000); // At this point, it is guaranteed that the service above is running and all ports being listened to. + if (this.useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } DaprClient client = new DaprClientBuilder().build(); for (int i = 0; i < NUM_MESSAGES; i++) { diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java index 74bfc7488..f58fa0b1b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java @@ -37,6 +37,7 @@ public class HttpStateClientIT extends BaseIT { false, 1000 ); + daprRun.switchToHTTP(); } @Test diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index e08799d9c..6f670b5df 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -193,7 +193,7 @@ public class DaprClientGrpc implements DaprClient { builder.setData(data); } if (metadata != null) { - builder.getMetadataMap().putAll(metadata); + builder.putAllMetadata(metadata); } DaprProtos.InvokeBindingEnvelope envelope = builder.build(); return Mono.fromCallable(() -> { @@ -461,11 +461,14 @@ public class DaprClientGrpc implements DaprClient { String verb, String appId, String method, K request) throws IOException { DaprProtos.InvokeServiceEnvelope.Builder envelopeBuilder = DaprProtos.InvokeServiceEnvelope.newBuilder() .setId(appId) - .setMethod(verb); + .setMethod(method) + .putMetadata("http.verb", verb); if (request != null) { byte[] byteRequest = objectSerializer.serialize(request); Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build(); envelopeBuilder.setData(data); + } else { + envelopeBuilder.setData(Any.newBuilder().build()); } return envelopeBuilder.build(); } diff --git a/sdk/src/main/java/io/dapr/utils/Properties.java b/sdk/src/main/java/io/dapr/utils/Properties.java index 3b56710d8..256050266 100644 --- a/sdk/src/main/java/io/dapr/utils/Properties.java +++ b/sdk/src/main/java/io/dapr/utils/Properties.java @@ -28,7 +28,7 @@ public class Properties { /** * Dapr's default GRPC port. */ - private static final Boolean DEFAULT_GRPC_ENABLED = false; + private static final Boolean DEFAULT_GRPC_ENABLED = true; /** * Dapr's default String encoding: UTF-8.