From befed4a01b8ae75a647058b663f406bbede8a3ac Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Sat, 23 Jan 2021 12:12:34 -0800 Subject: [PATCH] Makes service method API invocations over HTTP by default. (#450) * Makes service method API invocations over HTTP by default. * Add logic to avoid building managedchannel in case of http API for actor runtime. --- .../dapr/actors/client/ActorProxyBuilder.java | 33 +- .../io/dapr/actors/runtime/ActorRuntime.java | 5 +- .../src/test/java/io/dapr/it/AppRun.java | 12 +- .../src/test/java/io/dapr/it/BaseIT.java | 21 +- .../src/test/java/io/dapr/it/DaprRun.java | 40 +- .../java/io/dapr/it/actors/ActorStateIT.java | 30 +- .../io/dapr/it/secrets/SecretsClientIT.java | 9 +- .../io/dapr/it/state/GRPCStateClientIT.java | 4 - .../java/io/dapr/client/DaprApiProtocol.java | 16 + .../io/dapr/client/DaprClientBuilder.java | 35 +- .../java/io/dapr/client/DaprClientProxy.java | 478 ++++++++++++++++++ .../java/io/dapr/client/DaprHttpBuilder.java | 2 +- .../main/java/io/dapr/config/Properties.java | 41 +- .../io/dapr/client/DaprClientGrpcTest.java | 414 +++++++-------- .../io/dapr/client/DaprClientHttpTest.java | 204 +++----- .../io/dapr/client/DaprClientProxyTest.java | 72 +++ 16 files changed, 989 insertions(+), 427 deletions(-) create mode 100644 sdk/src/main/java/io/dapr/client/DaprApiProtocol.java create mode 100644 sdk/src/main/java/io/dapr/client/DaprClientProxy.java create mode 100644 sdk/src/test/java/io/dapr/client/DaprClientProxyTest.java diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java index 9c999af96..d001ec404 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java @@ -7,6 +7,7 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; import io.dapr.actors.ActorUtils; +import io.dapr.client.DaprApiProtocol; import io.dapr.client.DaprHttpBuilder; import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; @@ -26,7 +27,7 @@ public class ActorProxyBuilder implements Closeable { /** * Determine if this builder will create GRPC clients instead of HTTP clients. */ - private final boolean useGrpc; + private final DaprApiProtocol apiProtocol; /** * Actor's type. @@ -73,6 +74,19 @@ public class ActorProxyBuilder implements Closeable { * @param actorTypeClass Actor's type class. */ public ActorProxyBuilder(String actorType, Class actorTypeClass) { + this(actorType, actorTypeClass, Properties.API_PROTOCOL.get()); + } + + /** + * Instantiates a new builder for a given Actor type, using {@link DefaultObjectSerializer} by default. + * + * {@link DefaultObjectSerializer} is not recommended for production scenarios. + * + * @param actorType Actor's type. + * @param actorTypeClass Actor's type class. + * @param apiProtocol Dapr's API protocol. + */ + private ActorProxyBuilder(String actorType, Class actorTypeClass, DaprApiProtocol apiProtocol) { if ((actorType == null) || actorType.isEmpty()) { throw new IllegalArgumentException("ActorType is required."); } @@ -80,12 +94,12 @@ public class ActorProxyBuilder implements Closeable { throw new IllegalArgumentException("ActorTypeClass is required."); } - this.useGrpc = Properties.USE_GRPC.get(); + this.apiProtocol = apiProtocol; this.actorType = actorType; this.objectSerializer = new DefaultObjectSerializer(); this.clazz = actorTypeClass; this.daprHttpBuilder = new DaprHttpBuilder(); - this.channel = buildManagedChannel(); + this.channel = buildManagedChannel(apiProtocol); } /** @@ -138,11 +152,11 @@ public class ActorProxyBuilder implements Closeable { * @throws java.lang.IllegalStateException if any required field is missing */ private DaprClient buildDaprClient() { - if (this.useGrpc) { - return new DaprGrpcClient(DaprGrpc.newFutureStub(this.channel)); + switch (this.apiProtocol) { + case GRPC: return new DaprGrpcClient(DaprGrpc.newFutureStub(this.channel)); + case HTTP: return new DaprHttpClient(daprHttpBuilder.build()); + default: throw new IllegalStateException("Unsupported protocol: " + this.apiProtocol.name()); } - - return new DaprHttpClient(daprHttpBuilder.build()); } /** @@ -158,10 +172,11 @@ public class ActorProxyBuilder implements Closeable { /** * Creates a GRPC managed channel (or null, if not applicable). * + * @param apiProtocol Dapr's API protocol. * @return GRPC managed channel or null. */ - private static ManagedChannel buildManagedChannel() { - if (!Properties.USE_GRPC.get()) { + private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) { + if (apiProtocol != DaprApiProtocol.GRPC) { return null; } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index 5312046c5..d9a31c41a 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -7,6 +7,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; import io.dapr.actors.ActorTrace; +import io.dapr.client.DaprApiProtocol; import io.dapr.client.DaprHttpBuilder; import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; @@ -307,7 +308,7 @@ public class ActorRuntime implements Closeable { * @throws java.lang.IllegalStateException if any required field is missing */ private static DaprClient buildDaprClient(ManagedChannel channel) { - if (Properties.USE_GRPC.get()) { + if (Properties.API_PROTOCOL.get() == DaprApiProtocol.GRPC) { return new DaprGrpcClient(channel); } @@ -320,7 +321,7 @@ public class ActorRuntime implements Closeable { * @return GRPC managed channel or null. */ private static ManagedChannel buildManagedChannel() { - if (!Properties.USE_GRPC.get()) { + if (Properties.API_PROTOCOL.get() != DaprApiProtocol.GRPC) { return null; } diff --git a/sdk-tests/src/test/java/io/dapr/it/AppRun.java b/sdk-tests/src/test/java/io/dapr/it/AppRun.java index 5a275fc8a..620a5365c 100644 --- a/sdk-tests/src/test/java/io/dapr/it/AppRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/AppRun.java @@ -5,6 +5,7 @@ package io.dapr.it; +import io.dapr.client.DaprApiProtocol; import io.dapr.config.Properties; import java.io.IOException; @@ -19,7 +20,7 @@ import static io.dapr.it.Retry.callWithRetry; public class AppRun implements Stoppable { private static final String APP_COMMAND = - "mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D dapr.grpc.enabled=%b"; + "mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D %s=%s -D %s=%s"; private final DaprPorts ports; @@ -31,10 +32,10 @@ public class AppRun implements Stoppable { String successMessage, Class serviceClass, int maxWaitMilliseconds, - boolean useGRPC) { + DaprApiProtocol protocol) { this.command = new Command( successMessage, - buildCommand(serviceClass, ports, useGRPC), + buildCommand(serviceClass, ports, protocol), new HashMap<>() {{ put("DAPR_HTTP_PORT", ports.getHttpPort().toString()); put("DAPR_GRPC_PORT", ports.getGrpcPort().toString()); @@ -72,10 +73,11 @@ public class AppRun implements Stoppable { } } - private static String buildCommand(Class serviceClass, DaprPorts ports, boolean useGRPC) { + private static String buildCommand(Class serviceClass, DaprPorts ports, DaprApiProtocol protocol) { return String.format(APP_COMMAND, serviceClass.getCanonicalName(), ports.getAppPort() != null ? ports.getAppPort().toString() : "", - useGRPC); + Properties.API_PROTOCOL.getName(), protocol, + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol); } private static void assertListeningOnPort(int port) { diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index 98e63780b..1e8f146e4 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -5,6 +5,7 @@ package io.dapr.it; +import io.dapr.client.DaprApiProtocol; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.AfterClass; @@ -30,7 +31,7 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, true); + return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC); } protected static DaprRun startDaprApp( @@ -39,14 +40,15 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds, - boolean useGRPC) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, useGRPC); + DaprApiProtocol protocol) throws Exception { + return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, protocol); } protected static DaprRun startDaprApp( String testName, int maxWaitMilliseconds) throws Exception { - return startDaprApp(testName, "You're up and running!", null, false, true, maxWaitMilliseconds, true); + return startDaprApp( + testName, "You're up and running!", null, false, true, maxWaitMilliseconds, DaprApiProtocol.GRPC); } protected static DaprRun startDaprApp( @@ -56,13 +58,13 @@ public abstract class BaseIT { Boolean useAppPort, Boolean useDaprPorts, int maxWaitMilliseconds, - boolean useGRPC) throws Exception { + DaprApiProtocol protocol) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), successMessage, maxWaitMilliseconds, - useGRPC).withServiceClass(serviceClass); + protocol).withServiceClass(serviceClass); DaprRun run = builder.build(); TO_BE_STOPPED.add(run); DAPR_RUN_BUILDERS.put(run.getAppName(), builder); @@ -77,7 +79,8 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds) throws Exception { - return startSplitDaprAndApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, true); + return startSplitDaprAndApp( + testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC); } protected static ImmutablePair startSplitDaprAndApp( @@ -86,13 +89,13 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds, - boolean useGRPC) throws Exception { + DaprApiProtocol protocol) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, true, true), successMessage, maxWaitMilliseconds, - useGRPC).withServiceClass(serviceClass); + protocol).withServiceClass(serviceClass); ImmutablePair runs = builder.splitBuild(); TO_BE_STOPPED.add(runs.left); TO_BE_STOPPED.add(runs.right); diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index cf0f3d219..8939785aa 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -5,6 +5,7 @@ package io.dapr.it; +import io.dapr.client.DaprApiProtocol; import io.dapr.config.Properties; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -23,7 +24,7 @@ public class DaprRun implements Stoppable { // the arg in -Dexec.args is the app's port private static final String DAPR_COMMAND = - " -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D dapr.grpc.enabled=%s"; + " -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D %s=%s -D %s=%s"; private final DaprPorts ports; @@ -44,11 +45,11 @@ public class DaprRun implements Stoppable { String successMessage, Class serviceClass, int maxWaitMilliseconds, - boolean useGRPC) { + DaprApiProtocol protocol) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName()); this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, useGRPC)); + new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol)); this.listCommand = new Command( this.appName, "dapr list"); @@ -132,20 +133,24 @@ public class DaprRun implements Stoppable { public void use() { if (this.ports.getHttpPort() != null) { - System.getProperties().setProperty("dapr.http.port", String.valueOf(this.ports.getHttpPort())); + System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.ports.getHttpPort())); } if (this.ports.getGrpcPort() != null) { - System.getProperties().setProperty("dapr.grpc.port", String.valueOf(this.ports.getGrpcPort())); + System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort())); } - System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString()); + System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); } public void switchToGRPC() { - System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString()); + System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); } public void switchToHTTP() { - System.getProperties().setProperty("dapr.grpc.enabled", Boolean.FALSE.toString()); + System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name()); + } + + public void switchToProtocol(DaprApiProtocol protocol) { + System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), protocol.name()); } public int getGrpcPort() { @@ -164,14 +169,17 @@ public class DaprRun implements Stoppable { return appName; } - private static String buildDaprCommand(String appName, Class serviceClass, DaprPorts ports, boolean useGRPC) { + private static String buildDaprCommand( + String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol) { StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName)) .append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "") .append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "") .append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "") .append(serviceClass == null ? "" : String.format(DAPR_COMMAND, serviceClass.getCanonicalName(), - ports.getAppPort() != null ? ports.getAppPort().toString() : "", useGRPC)); + ports.getAppPort() != null ? ports.getAppPort().toString() : "", + Properties.API_PROTOCOL.getName(), protocol, + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol)); return stringBuilder.toString(); } @@ -200,19 +208,19 @@ public class DaprRun implements Stoppable { private Class serviceClass; - private boolean useGRPC; + private DaprApiProtocol protocol; Builder( String testName, Supplier portsSupplier, String successMessage, int maxWaitMilliseconds, - boolean useGRPC) { + DaprApiProtocol protocol) { this.testName = testName; this.portsSupplier = portsSupplier; this.successMessage = successMessage; this.maxWaitMilliseconds = maxWaitMilliseconds; - this.useGRPC = useGRPC; + this.protocol = protocol; } public Builder withServiceClass(Class serviceClass) { @@ -227,7 +235,7 @@ public class DaprRun implements Stoppable { this.successMessage, this.serviceClass, this.maxWaitMilliseconds, - this.useGRPC); + this.protocol); } /** @@ -241,7 +249,7 @@ public class DaprRun implements Stoppable { this.successMessage, this.serviceClass, this.maxWaitMilliseconds, - this.useGRPC); + this.protocol); DaprRun daprRun = new DaprRun( this.testName, @@ -249,7 +257,7 @@ public class DaprRun implements Stoppable { DAPR_SUCCESS_MESSAGE, null, this.maxWaitMilliseconds, - this.useGRPC); + this.protocol); return new ImmutablePair<>(appRun, daprRun); } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java index 0a1786c22..60fc74aba 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java @@ -8,6 +8,7 @@ package io.dapr.it.actors; import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxy; import io.dapr.actors.client.ActorProxyBuilder; +import io.dapr.client.DaprApiProtocol; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import io.dapr.it.actors.services.springboot.StatefulActor; @@ -38,14 +39,19 @@ public class ActorStateIT extends BaseIT { */ @Parameterized.Parameters public static Collection data() { - return Arrays.asList(new Object[][] { { false, false }, { false, true }, { true, false }, { true, true } }); + return Arrays.asList(new Object[][] { + { DaprApiProtocol.HTTP, DaprApiProtocol.HTTP }, + { DaprApiProtocol.HTTP, DaprApiProtocol.GRPC }, + { DaprApiProtocol.GRPC, DaprApiProtocol.HTTP }, + { DaprApiProtocol.GRPC, DaprApiProtocol.GRPC }, + }); } @Parameterized.Parameter(0) - public boolean useGrpc; + public DaprApiProtocol daprClientProtocol; @Parameterized.Parameter(1) - public boolean useGrpcInService; + public DaprApiProtocol serviceAppProtocol; @Test public void writeReadState() throws Exception { @@ -57,19 +63,15 @@ public class ActorStateIT extends BaseIT { StatefulActorService.class, true, 60000, - useGrpcInService); + serviceAppProtocol); - if (this.useGrpc) { - runtime.switchToGRPC(); - } else { - runtime.switchToHTTP(); - } + runtime.switchToProtocol(this.daprClientProtocol); String message = "This is a message to be saved and retrieved."; String name = "Jon Doe"; byte[] bytes = new byte[] { 0x1 }; ActorId actorId = new ActorId( - String.format("%d-%b-%b", System.currentTimeMillis(), this.useGrpc, this.useGrpcInService)); + String.format("%d-%b-%b", System.currentTimeMillis(), this.daprClientProtocol, this.serviceAppProtocol)); String actorType = "StatefulActorTest"; logger.debug("Building proxy ..."); ActorProxyBuilder proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); @@ -153,13 +155,9 @@ public class ActorStateIT extends BaseIT { StatefulActorService.class, true, 60000, - useGrpcInService); + serviceAppProtocol); - if (this.useGrpc) { - run2.switchToGRPC(); - } else { - run2.switchToHTTP(); - } + runtime.switchToProtocol(this.daprClientProtocol); // Need new proxy builder because the proxy builder holds the channel. proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); diff --git a/sdk-tests/src/test/java/io/dapr/it/secrets/SecretsClientIT.java b/sdk-tests/src/test/java/io/dapr/it/secrets/SecretsClientIT.java index c9023e9bd..d87024f5e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/secrets/SecretsClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/secrets/SecretsClientIT.java @@ -9,8 +9,6 @@ import com.bettercloud.vault.Vault; import com.bettercloud.vault.VaultConfig; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.DaprClientGrpc; -import io.dapr.client.DaprClientHttp; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import org.junit.After; @@ -79,7 +77,7 @@ public class SecretsClientIT extends BaseIT { } @Before - public void setup() throws Exception { + public void setup() { if (this.useGrpc) { daprRun.switchToGRPC(); } else { @@ -87,11 +85,6 @@ public class SecretsClientIT extends BaseIT { } this.daprClient = new DaprClientBuilder().build(); - if (this.useGrpc) { - assertEquals(DaprClientGrpc.class, this.daprClient.getClass()); - } else { - assertEquals(DaprClientHttp.class, this.daprClient.getClass()); - } } @After diff --git a/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java index c115d2183..51d2a2b7f 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java @@ -7,7 +7,6 @@ package io.dapr.it.state; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.DaprClientGrpc; import io.dapr.client.domain.State; import io.dapr.it.DaprRun; import org.junit.AfterClass; @@ -17,7 +16,6 @@ import org.junit.Test; import java.util.Collections; import static io.dapr.it.TestUtils.assertThrowsDaprException; -import static org.junit.Assert.assertTrue; /** * Test State GRPC DAPR capabilities using a DAPR instance with an empty service running @@ -33,8 +31,6 @@ public class GRPCStateClientIT extends AbstractStateClientIT { daprRun = startDaprApp(GRPCStateClientIT.class.getSimpleName(), 5000); daprRun.switchToGRPC(); daprClient = new DaprClientBuilder().build(); - - assertTrue(daprClient instanceof DaprClientGrpc); } @AfterClass diff --git a/sdk/src/main/java/io/dapr/client/DaprApiProtocol.java b/sdk/src/main/java/io/dapr/client/DaprApiProtocol.java new file mode 100644 index 000000000..84eb1d77e --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/DaprApiProtocol.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.client; + +/** + * Transport protocol for Dapr's API. + */ +public enum DaprApiProtocol { + + GRPC, + HTTP + +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 1ff59ab4f..884e024ad 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -24,7 +24,12 @@ public class DaprClientBuilder { /** * Determine if this builder will create GRPC clients instead of HTTP clients. */ - private final boolean useGrpc; + private final DaprApiProtocol apiProtocol; + + /** + * Determine if this builder will use HTTP client for service method invocation APIs. + */ + private final DaprApiProtocol methodInvocationApiProtocol; /** * Builder for Dapr's HTTP Client. @@ -50,7 +55,8 @@ public class DaprClientBuilder { public DaprClientBuilder() { this.objectSerializer = new DefaultObjectSerializer(); this.stateSerializer = new DefaultObjectSerializer(); - this.useGrpc = Properties.USE_GRPC.get(); + this.apiProtocol = Properties.API_PROTOCOL.get(); + this.methodInvocationApiProtocol = Properties.API_METHOD_INVOCATION_PROTOCOL.get(); this.daprHttpBuilder = new DaprHttpBuilder(); } @@ -97,11 +103,30 @@ public class DaprClientBuilder { * @throws java.lang.IllegalStateException if any required field is missing */ public DaprClient build() { - if (this.useGrpc) { - return buildDaprClientGrpc(); + if (this.apiProtocol != this.methodInvocationApiProtocol) { + return new DaprClientProxy(buildDaprClient(this.apiProtocol), buildDaprClient(this.methodInvocationApiProtocol)); } - return buildDaprClientHttp(); + return buildDaprClient(this.apiProtocol); + } + + /** + * Creates an instance of a Dapr Client based on the chosen protocol. + * + * @param protocol Dapr API's protocol. + * @return the GRPC Client. + * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number. + */ + private DaprClient buildDaprClient(DaprApiProtocol protocol) { + if (protocol == null) { + throw new IllegalStateException("Protocol is required."); + } + + switch (protocol) { + case GRPC: return buildDaprClientGrpc(); + case HTTP: return buildDaprClientHttp(); + default: throw new IllegalStateException("Unsupported protocol: " + protocol.name()); + } } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprClientProxy.java b/sdk/src/main/java/io/dapr/client/DaprClientProxy.java new file mode 100644 index 000000000..dcab27495 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/DaprClientProxy.java @@ -0,0 +1,478 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.client; + +import io.dapr.client.domain.DeleteStateRequest; +import io.dapr.client.domain.ExecuteStateTransactionRequest; +import io.dapr.client.domain.GetBulkStateRequest; +import io.dapr.client.domain.GetSecretRequest; +import io.dapr.client.domain.GetStateRequest; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeBindingRequest; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.Response; +import io.dapr.client.domain.SaveStateRequest; +import io.dapr.client.domain.State; +import io.dapr.client.domain.StateOptions; +import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.utils.TypeRef; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; + +/** + * Class that delegates to other implementations. + * + * @see DaprClient + * @see DaprClientGrpc + * @see DaprClientHttp + */ +class DaprClientProxy implements DaprClient { + + /** + * Client for all API invocations. + */ + private final DaprClient client; + + /** + * Client to override Dapr's service invocation APIs. + */ + private final DaprClient methodInvocationOverrideClient; + + /** + * Constructor with delegate client. + * + * @param client Client for all API invocations. + * @see DaprClientBuilder + */ + DaprClientProxy(DaprClient client) { + this(client, client); + } + + /** + * Constructor with delegate client and override client for Dapr's method invocation APIs. + * + * @param client Client for all API invocations, except override below. + * @param methodInvocationOverrideClient Client to override Dapr's service invocation APIs. + * @see DaprClientBuilder + */ + DaprClientProxy( + DaprClient client, + DaprClient methodInvocationOverrideClient) { + this.client = client; + this.methodInvocationOverrideClient = methodInvocationOverrideClient; + } + + /** + * {@inheritDoc} + */ + @Override + public Mono waitForSidecar(int timeoutInMilliseconds) { + return client.waitForSidecar(timeoutInMilliseconds); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono publishEvent(String pubsubName, String topicName, Object data) { + return client.publishEvent(pubsubName, topicName, data); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono publishEvent(String pubsubName, String topicName, Object data, Map metadata) { + return client.publishEvent(pubsubName, topicName, data, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> publishEvent(PublishEventRequest request) { + return client.publishEvent(request); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + Object data, + HttpExtension httpExtension, + Map metadata, + TypeRef type) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, data, httpExtension, metadata, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + Object request, + HttpExtension httpExtension, + Map metadata, + Class clazz) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + Object request, + HttpExtension httpExtension, + TypeRef type) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + Object request, + HttpExtension httpExtension, + Class clazz) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + HttpExtension httpExtension, + Map metadata, + TypeRef type) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, httpExtension, metadata, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + HttpExtension httpExtension, + Map metadata, + Class clazz) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, httpExtension, metadata, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + Object request, + HttpExtension httpExtension, + Map metadata) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + HttpExtension httpExtension, + Map metadata) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, httpExtension, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeMethod(String appId, + String methodName, + byte[] request, + HttpExtension httpExtension, + Map metadata) { + return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { + return methodInvocationOverrideClient.invokeMethod(invokeMethodRequest, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String bindingName, String operation, Object data) { + return client.invokeBinding(bindingName, operation, data); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String bindingName, String operation, byte[] data, Map metadata) { + return client.invokeBinding(bindingName, operation, data, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, TypeRef type) { + return client.invokeBinding(bindingName, operation, data, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, Class clazz) { + return client.invokeBinding(bindingName, operation, data, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String bindingName, + String operation, + Object data, + Map metadata, + TypeRef type) { + return client.invokeBinding(bindingName, operation, data, metadata, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String bindingName, + String operation, + Object data, + Map metadata, + Class clazz) { + return client.invokeBinding(bindingName, operation, data, metadata, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> invokeBinding(InvokeBindingRequest request, TypeRef type) { + return client.invokeBinding(request, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getState(String storeName, State state, TypeRef type) { + return client.getState(storeName, state, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getState(String storeName, State state, Class clazz) { + return client.getState(storeName, state, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getState(String storeName, String key, TypeRef type) { + return client.getState(storeName, key, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getState(String storeName, String key, Class clazz) { + return client.getState(storeName, key, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getState(String storeName, String key, StateOptions options, TypeRef type) { + return client.getState(storeName, key, options, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getState(String storeName, String key, StateOptions options, Class clazz) { + return client.getState(storeName, key, options, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono>> getState(GetStateRequest request, TypeRef type) { + return client.getState(request, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono>> getBulkState(String storeName, List keys, TypeRef type) { + return client.getBulkState(storeName, keys, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono>> getBulkState(String storeName, List keys, Class clazz) { + return client.getBulkState(storeName, keys, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono>>> getBulkState(GetBulkStateRequest request, TypeRef type) { + return client.getBulkState(request, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono executeStateTransaction(String storeName, List> operations) { + return client.executeStateTransaction(storeName, operations); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> executeStateTransaction(ExecuteStateTransactionRequest request) { + return client.executeStateTransaction(request); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono saveBulkState(String storeName, List> states) { + return client.saveBulkState(storeName, states); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> saveBulkState(SaveStateRequest request) { + return client.saveBulkState(request); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono saveState(String storeName, String key, Object value) { + return client.saveState(storeName, key, value); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono saveState(String storeName, String key, String etag, Object value, StateOptions options) { + return client.saveState(storeName, key, etag, value, options); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono deleteState(String storeName, String key) { + return client.deleteState(storeName, key); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono deleteState(String storeName, String key, String etag, StateOptions options) { + return client.deleteState(storeName, key, etag, options); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> deleteState(DeleteStateRequest request) { + return client.deleteState(request); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getSecret(String storeName, String secretName, Map metadata) { + return client.getSecret(storeName, secretName, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono> getSecret(String storeName, String secretName) { + return client.getSecret(storeName, secretName); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono>> getSecret(GetSecretRequest request) { + return client.getSecret(request); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws Exception { + client.close(); + if (client != methodInvocationOverrideClient) { + methodInvocationOverrideClient.close(); + } + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java index 498f5522e..40013add9 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java @@ -29,7 +29,7 @@ public class DaprHttpBuilder { /** * Read timeout used to build object. */ - private Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READTIMEOUTSECONDS.get()); + private Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get()); /** * Sets the read timeout duration for the instance to be built. diff --git a/sdk/src/main/java/io/dapr/config/Properties.java b/sdk/src/main/java/io/dapr/config/Properties.java index 79c58ea40..0cf16f16a 100644 --- a/sdk/src/main/java/io/dapr/config/Properties.java +++ b/sdk/src/main/java/io/dapr/config/Properties.java @@ -5,6 +5,8 @@ package io.dapr.config; +import io.dapr.client.DaprApiProtocol; + import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -14,7 +16,7 @@ import java.nio.charset.StandardCharsets; public class Properties { /** - * Dapr's default IP for HTTP and GRPC communication. + * Dapr's default IP for HTTP and gRPC communication. */ private static final String DEFAULT_SIDECAR_IP = "127.0.0.1"; @@ -24,14 +26,19 @@ public class Properties { private static final Integer DEFAULT_HTTP_PORT = 3500; /** - * Dapr's default GRPC port. + * Dapr's default gRPC port. */ private static final Integer DEFAULT_GRPC_PORT = 50001; /** - * Dapr's default GRPC port. + * Dapr's default use of gRPC or HTTP. */ - private static final Boolean DEFAULT_GRPC_ENABLED = true; + private static final DaprApiProtocol DEFAULT_API_PROTOCOL = DaprApiProtocol.GRPC; + + /** + * Dapr's default use of gRPC or HTTP for Dapr's method invocation APIs. + */ + private static final DaprApiProtocol DEFAULT_API_METHOD_INVOCATION_PROTOCOL = DaprApiProtocol.HTTP; /** * Dapr's default String encoding: UTF-8. @@ -68,12 +75,22 @@ public class Properties { DEFAULT_GRPC_PORT); /** - * Determines if Dapr client will use GRPC to talk to Dapr's side car. + * Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car. */ - public static final Property USE_GRPC = new BooleanProperty( - "dapr.grpc.enabled", - "DAPR_GRPC_ENABLED", - DEFAULT_GRPC_ENABLED); + public static final Property API_PROTOCOL = new GenericProperty<>( + "dapr.api.protocol", + "DAPR_API_PROTOCOL", + DEFAULT_API_PROTOCOL, + (s) -> DaprApiProtocol.valueOf(s.toUpperCase())); + + /** + * Determines if Dapr client should use gRPC or HTTP for Dapr's service method invocation APIs. + */ + public static final Property API_METHOD_INVOCATION_PROTOCOL = new GenericProperty<>( + "dapr.api.methodInvocation.protocol", + "DAPR_API_METHOD_INVOCATION_PROTOCOL", + DEFAULT_API_METHOD_INVOCATION_PROTOCOL, + (s) -> DaprApiProtocol.valueOf(s.toUpperCase())); /** * API token for authentication between App and Dapr's side car. @@ -95,8 +112,8 @@ public class Properties { /** * Dapr's timeout in seconds for HTTP client reads. */ - public static final Property HTTP_CLIENT_READTIMEOUTSECONDS = new IntegerProperty( - "dapr.http.client.readtimeoutseconds", - "DAPR_HTTP_CLIENT_READTIMEOUTSECONDS", + public static final Property HTTP_CLIENT_READ_TIMEOUT_SECONDS = new IntegerProperty( + "dapr.http.client.readTimeoutSeconds", + "DAPR_HTTP_CLIENT_READ_TIMEOUT_SECONDS", DEFAULT_HTTP_CLIENT_READTIMEOUTSECONDS); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 08a2bf611..3f1179e05 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -77,23 +77,25 @@ public class DaprClientGrpcTest { private static final String SECRET_STORE_NAME = "MySecretStore"; private Closeable closeable; - private DaprGrpc.DaprStub client; - private DaprClientGrpc adapter; + private DaprGrpc.DaprStub daprStub; + private DaprClient client; private ObjectSerializer serializer; @Before public void setup() throws IOException { closeable = mock(Closeable.class); - client = mock(DaprGrpc.DaprStub.class); - when(client.withInterceptors(any())).thenReturn(client); - adapter = new DaprClientGrpc(closeable, client, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + daprStub = mock(DaprGrpc.DaprStub.class); + when(daprStub.withInterceptors(any())).thenReturn(daprStub); + DaprClient grpcClient = new DaprClientGrpc( + closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + client = new DaprClientProxy(grpcClient); serializer = new ObjectSerializer(); doNothing().when(closeable).close(); } @After public void tearDown() throws Exception { - adapter.close(); + client.close(); verify(closeable).close(); verifyNoMoreInteractions(closeable); } @@ -102,7 +104,7 @@ public class DaprClientGrpcTest { public void waitForSidecarTimeout() throws Exception { int port = findFreePort(); System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port)); - assertThrows(RuntimeException.class, () -> adapter.waitForSidecar(1).block()); + assertThrows(RuntimeException.class, () -> client.waitForSidecar(1).block()); } @Test @@ -118,7 +120,7 @@ public class DaprClientGrpcTest { } }); t.start(); - adapter.waitForSidecar(10000).block(); + client.waitForSidecar(10000).block(); } } @@ -126,13 +128,13 @@ public class DaprClientGrpcTest { public void publishEventExceptionThrownTest() { doAnswer((Answer) invocation -> { throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"); - }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); assertThrowsDaprException( StatusRuntimeException.class, "INVALID_ARGUMENT", "INVALID_ARGUMENT: bad bad argument", - () -> adapter.publishEvent("pubsubname","topic", "object").block()); + () -> client.publishEvent("pubsubname","topic", "object").block()); } @Test @@ -141,9 +143,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument")); return null; - }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); - Mono result = adapter.publishEvent("pubsubname","topic", "object"); + Mono result = client.publishEvent("pubsubname","topic", "object"); assertThrowsDaprException( ExecutionException.class, @@ -155,16 +157,16 @@ public class DaprClientGrpcTest { @Test public void publishEventSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - adapter = new DaprClientGrpc(closeable, client, mockSerializer, new DefaultObjectSerializer()); + client = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer()); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); when(mockSerializer.serialize(any())).thenThrow(IOException.class); - Mono result = adapter.publishEvent("pubsubname","topic", "{invalid-json"); + Mono result = client.publishEvent("pubsubname","topic", "{invalid-json"); assertThrowsDaprException( IOException.class, @@ -180,9 +182,9 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); - Mono result = adapter.publishEvent("pubsubname","topic", "object"); + Mono result = client.publishEvent("pubsubname","topic", "object"); result.block(); } @@ -195,8 +197,8 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); - adapter.publishEvent("pubsubname", "topic", "object"); + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + client.publishEvent("pubsubname", "topic", "object"); // Do not call block() on the mono above, so nothing should happen. assertFalse(called.get()); } @@ -208,10 +210,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); MyObject event = new MyObject(1, "Event"); - Mono result = adapter.publishEvent("pubsubname", "topic", event); + Mono result = client.publishEvent("pubsubname", "topic", event); result.block(); } @@ -219,35 +221,35 @@ public class DaprClientGrpcTest { public void invokeBindingIllegalArgumentExceptionTest() { assertThrows(IllegalArgumentException.class, () -> { // empty binding name - adapter.invokeBinding("", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP).block(); + client.invokeBinding("", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null binding name - adapter.invokeBinding(null, "MyOperation", "request".getBytes(), Collections.EMPTY_MAP).block(); + client.invokeBinding(null, "MyOperation", "request".getBytes(), Collections.EMPTY_MAP).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null binding operation - adapter.invokeBinding("BindingName", null, "request".getBytes(), Collections.EMPTY_MAP).block(); + client.invokeBinding("BindingName", null, "request".getBytes(), Collections.EMPTY_MAP).block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty binding operation - adapter.invokeBinding("BindingName", "", "request".getBytes(), Collections.EMPTY_MAP).block(); + client.invokeBinding("BindingName", "", "request".getBytes(), Collections.EMPTY_MAP).block(); }); } @Test public void invokeBindingSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - adapter = new DaprClientGrpc(closeable, client, mockSerializer, new DefaultObjectSerializer()); + client = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer()); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); when(mockSerializer.serialize(any())).thenThrow(IOException.class); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); + Mono result = client.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); assertThrowsDaprException( IOException.class, @@ -260,9 +262,9 @@ public class DaprClientGrpcTest { public void invokeBindingExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request"); + Mono result = client.invokeBinding("BindingName", "MyOperation", "request"); assertThrowsDaprException( RuntimeException.class, @@ -278,9 +280,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request"); + Mono result = client.invokeBinding("BindingName", "MyOperation", "request"); assertThrowsDaprException( ExecutionException.class, @@ -298,9 +300,9 @@ public class DaprClientGrpcTest { observer.onNext(responseBuilder.build()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request"); + Mono result = client.invokeBinding("BindingName", "MyOperation", "request"); result.block(); } @@ -313,9 +315,9 @@ public class DaprClientGrpcTest { observer.onNext(responseBuilder.build()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); + Mono result = client.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); assertEquals("OK", new String(result.block(), StandardCharsets.UTF_8)); } @@ -329,10 +331,10 @@ public class DaprClientGrpcTest { observer.onNext(responseBuilder.build()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); MyObject event = new MyObject(1, "Event"); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", event); + Mono result = client.invokeBinding("BindingName", "MyOperation", event); result.block(); } @@ -346,10 +348,10 @@ public class DaprClientGrpcTest { observer.onNext(responseBuilder.build()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); MyObject event = new MyObject(1, "Event"); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", event, String.class); + Mono result = client.invokeBinding("BindingName", "MyOperation", event, String.class); assertEquals("OK", result.block()); } @@ -363,10 +365,10 @@ public class DaprClientGrpcTest { observer.onNext(responseBuilder.build()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); MyObject event = new MyObject(1, "Event"); - Mono result = adapter.invokeBinding("BindingName", "MyOperation", event, TypeRef.get(String.class)); + Mono result = client.invokeBinding("BindingName", "MyOperation", event, TypeRef.get(String.class)); assertEquals("OK", result.block()); } @@ -382,9 +384,9 @@ public class DaprClientGrpcTest { observer.onNext(responseBuilder.build()); observer.onCompleted(); return null; - }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); MyObject event = new MyObject(1, "Event"); - adapter.invokeBinding("BindingName", "MyOperation", event); + client.invokeBinding("BindingName", "MyOperation", event); // Do not call block() on mono above, so nothing should happen. assertFalse(called.get()); } @@ -393,9 +395,9 @@ public class DaprClientGrpcTest { public void invokeServiceVoidExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE); assertThrowsDaprException( RuntimeException.class, @@ -411,10 +413,10 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); // HttpExtension cannot be null - Mono result = adapter.invokeMethod("appId", "method", "request", null); + Mono result = client.invokeMethod("appId", "method", "request", null); assertThrows(IllegalArgumentException.class, () -> result.block()); } @@ -423,9 +425,9 @@ public class DaprClientGrpcTest { public void invokeServiceEmptyRequestVoidExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null); + Mono result = client.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null); assertThrowsDaprException( RuntimeException.class, @@ -441,9 +443,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE); assertThrowsDaprException( ExecutionException.class, @@ -459,9 +461,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE); result.block(); } @@ -472,10 +474,10 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); MyObject request = new MyObject(1, "Event"); - Mono result = adapter.invokeMethod("appId", "method", request, HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", request, HttpExtension.NONE); result.block(); } @@ -483,9 +485,9 @@ public class DaprClientGrpcTest { public void invokeServiceExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); assertThrowsDaprException( RuntimeException.class, @@ -498,9 +500,9 @@ public class DaprClientGrpcTest { public void invokeServiceNoRequestClassExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null, String.class); + Mono result = client.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null, String.class); assertThrowsDaprException( RuntimeException.class, @@ -513,9 +515,9 @@ public class DaprClientGrpcTest { public void invokeServiceNoRequestTypeRefExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null, TypeRef.STRING); + Mono result = client.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null, TypeRef.STRING); assertThrowsDaprException( RuntimeException.class, @@ -531,9 +533,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); assertThrowsDaprException( ExecutionException.class, @@ -565,9 +567,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(eq(request), any()); + }).when(daprStub).invokeService(eq(request), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", httpExtension, null, String.class); + Mono result = client.invokeMethod("appId", "method", "request", httpExtension, null, String.class); String strOutput = result.block(); assertEquals(expected, strOutput); } @@ -580,9 +582,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); String strOutput = result.block(); assertEquals(expected, strOutput); @@ -596,9 +598,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(object)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, MyObject.class); + Mono result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, MyObject.class); MyObject resultObject = result.block(); assertEquals(object.id, resultObject.id); @@ -609,9 +611,9 @@ public class DaprClientGrpcTest { public void invokeServiceNoRequestBodyExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); assertThrowsDaprException( RuntimeException.class, @@ -627,9 +629,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); assertThrowsDaprException( ExecutionException.class, @@ -646,9 +648,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); String strOutput = result.block(); assertEquals(expected, strOutput); @@ -662,9 +664,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(object)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, MyObject.class); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, MyObject.class); MyObject resultObject = result.block(); assertEquals(object.id, resultObject.id); @@ -675,11 +677,11 @@ public class DaprClientGrpcTest { public void invokeServiceByteRequestExceptionThrownTest() throws IOException { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); - Mono result = adapter.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE, byte[].class); + Mono result = client.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE, byte[].class); assertThrowsDaprException( RuntimeException.class, @@ -695,12 +697,12 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); Mono result = - adapter.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE,(HashMap) null); + client.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE,(HashMap) null); assertThrowsDaprException( ExecutionException.class, @@ -717,11 +719,11 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); - Mono result = adapter.invokeMethod( + Mono result = client.invokeMethod( "appId", "method", byteRequest, HttpExtension.NONE, (HashMap) null); byte[] byteOutput = result.block(); @@ -737,11 +739,11 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(resultObj)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); - Mono result = adapter.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE, byte[].class); + Mono result = client.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE, byte[].class); byte[] byteOutput = result.block(); assertEquals(resultObj, serializer.deserialize(byteOutput, MyObject.class)); @@ -751,8 +753,8 @@ public class DaprClientGrpcTest { public void invokeServiceNoRequestNoClassBodyExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); assertThrowsDaprException( RuntimeException.class, @@ -768,9 +770,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); assertThrowsDaprException( ExecutionException.class, @@ -787,9 +789,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); result.block(); } @@ -803,8 +805,8 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); // Do not call block() on mono above, so nothing should happen. assertFalse(called.get()); } @@ -817,9 +819,9 @@ public class DaprClientGrpcTest { observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(resultObj)).build()); observer.onCompleted(); return null; - }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); + Mono result = client.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); result.block(); } @@ -828,19 +830,19 @@ public class DaprClientGrpcTest { State key = buildStateKey(null, "Key1", "ETag1", null); assertThrows(IllegalArgumentException.class, () -> { // empty state store name - adapter.getState("", key, String.class).block(); + client.getState("", key, String.class).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null state store name - adapter.getState(null, key, String.class).block(); + client.getState(null, key, String.class).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null key - adapter.getState(STATE_STORE_NAME, (String)null, String.class).block(); + client.getState(STATE_STORE_NAME, (String)null, String.class).block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty key - adapter.getState(STATE_STORE_NAME, "", String.class).block(); + client.getState(STATE_STORE_NAME, "", String.class).block(); }); } @@ -848,10 +850,10 @@ public class DaprClientGrpcTest { public void getStateExceptionThrownTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); + Mono> result = client.getState(STATE_STORE_NAME, key, String.class); assertThrowsDaprException( RuntimeException.class, @@ -867,10 +869,10 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); + Mono> result = client.getState(STATE_STORE_NAME, key, String.class); assertThrowsDaprException( ExecutionException.class, @@ -891,10 +893,10 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); State keyRequest = buildStateKey(null, key, etag, null); - Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class); + Mono> result = client.getState(STATE_STORE_NAME, keyRequest, String.class); State res = result.block(); assertNotNull(res); @@ -914,10 +916,10 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); State keyRequest = buildStateKey(null, key, etag, null); - adapter.getState(STATE_STORE_NAME, keyRequest, String.class); + client.getState(STATE_STORE_NAME, keyRequest, String.class); // block() on the mono above is not called, so nothing should happen. assertFalse(called.get()); } @@ -939,9 +941,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); - Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); + Mono> result = client.getState(STATE_STORE_NAME, keyRequest, MyObject.class); State res = result.block(); assertNotNull(res); @@ -969,9 +971,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); - Mono>> result = adapter.getState(request, TypeRef.get(MyObject.class)); + Mono>> result = client.getState(request, TypeRef.get(MyObject.class)); Response> res = result.block(); assertNotNull(res); assertEquals(expectedState, res.getObject()); @@ -994,9 +996,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + }).when(daprStub).getState(any(DaprProtos.GetStateRequest.class), any()); - Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); + Mono> result = client.getState(STATE_STORE_NAME, keyRequest, MyObject.class); assertEquals(expectedState, result.block()); } @@ -1006,27 +1008,27 @@ public class DaprClientGrpcTest { State key = buildStateKey(null, "Key1", "ETag1", null); assertThrows(IllegalArgumentException.class, () -> { // empty state store name - adapter.getBulkState("", Collections.singletonList("100"), String.class).block(); + client.getBulkState("", Collections.singletonList("100"), String.class).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null state store name - adapter.getBulkState(null, Collections.singletonList("100"), String.class).block(); + client.getBulkState(null, Collections.singletonList("100"), String.class).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null key // null pointer exception due to keys being converted to an unmodifiable list - adapter.getBulkState(STATE_STORE_NAME, null, String.class).block(); + client.getBulkState(STATE_STORE_NAME, null, String.class).block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty key list - adapter.getBulkState(STATE_STORE_NAME, Collections.emptyList(), String.class).block(); + client.getBulkState(STATE_STORE_NAME, Collections.emptyList(), String.class).block(); }); // negative parallelism GetBulkStateRequest req = new GetBulkStateRequestBuilder(STATE_STORE_NAME, Collections.singletonList("100")) .withMetadata(new HashMap<>()) .withParallelism(-1) .build(); - assertThrows(IllegalArgumentException.class, () -> adapter.getBulkState(req, TypeRef.BOOLEAN).block()); + assertThrows(IllegalArgumentException.class, () -> client.getBulkState(req, TypeRef.BOOLEAN).block()); } @Test @@ -1051,9 +1053,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + }).when(daprStub).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); - List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); + List> result = client.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1088,8 +1090,8 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); - List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block(); + }).when(daprStub).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + List> result = client.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block(); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1124,9 +1126,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + }).when(daprStub).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); - List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block(); + List> result = client.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block(); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1160,9 +1162,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + }).when(daprStub).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); - List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), byte[].class).block(); + List> result = client.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), byte[].class).block(); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1195,9 +1197,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + }).when(daprStub).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); - List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block(); + List> result = client.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block(); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1215,10 +1217,10 @@ public class DaprClientGrpcTest { public void deleteStateExceptionThrowTest() { doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); + Mono result = client.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); assertThrowsDaprException( RuntimeException.class, @@ -1232,19 +1234,19 @@ public class DaprClientGrpcTest { State key = buildStateKey(null, "Key1", "ETag1", null); assertThrows(IllegalArgumentException.class, () -> { // empty state store name - adapter.deleteState("", key.getKey(), "etag", null).block(); + client.deleteState("", key.getKey(), "etag", null).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null state store name - adapter.deleteState(null, key.getKey(), "etag", null).block(); + client.deleteState(null, key.getKey(), "etag", null).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null state store name - adapter.deleteState(STATE_STORE_NAME, null, "etag", null).block(); + client.deleteState(STATE_STORE_NAME, null, "etag", null).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null state store name - adapter.deleteState(STATE_STORE_NAME, "", "etag", null).block(); + client.deleteState(STATE_STORE_NAME, "", "etag", null).block(); }); } @@ -1255,10 +1257,10 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); + Mono result = client.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); assertThrowsDaprException( ExecutionException.class, @@ -1276,10 +1278,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State stateKey = buildStateKey(null, key, etag, null); - Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + Mono result = client.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); result.block(); } @@ -1294,10 +1296,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + Mono result = client.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); result.block(); } @@ -1314,12 +1316,12 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); DeleteStateRequestBuilder builder = new DeleteStateRequestBuilder(STATE_STORE_NAME, key); builder.withEtag(etag).withStateOptions(options).withMetadata(metadata); DeleteStateRequest request = builder.build(); - Mono> result = adapter.deleteState(request); + Mono> result = client.deleteState(request); result.block(); } @@ -1335,10 +1337,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State stateKey = buildStateKey(null, key, etag, options); - adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + client.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); // Do not call result.block(), so nothing should happen. assertFalse(called.get()); @@ -1354,10 +1356,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + Mono result = client.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); result.block(); } @@ -1372,10 +1374,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + Mono result = client.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); result.block(); } @@ -1388,18 +1390,18 @@ public class DaprClientGrpcTest { key); assertThrows(IllegalArgumentException.class, () -> { // empty state store name - adapter.executeStateTransaction("", Collections.singletonList(upsertOperation)).block(); + client.executeStateTransaction("", Collections.singletonList(upsertOperation)).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null state store name - adapter.executeStateTransaction(null, Collections.singletonList(upsertOperation)).block(); + client.executeStateTransaction(null, Collections.singletonList(upsertOperation)).block(); }); } @Test public void executeTransactionSerializerExceptionTest() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - adapter = new DaprClientGrpc(closeable, client, mockSerializer, mockSerializer); + client = new DaprClientGrpc(closeable, daprStub, mockSerializer, mockSerializer); String etag = "ETag1"; String key = "key1"; String data = "my data"; @@ -1409,7 +1411,7 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + }).when(daprStub).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); when(mockSerializer.serialize(any())).thenThrow(IOException.class); @@ -1420,7 +1422,7 @@ public class DaprClientGrpcTest { ExecuteStateTransactionRequest request = new ExecuteStateTransactionRequestBuilder(STATE_STORE_NAME) .withTransactionalStates(upsertOperation) .build(); - Mono> result = adapter.executeStateTransaction(request); + Mono> result = client.executeStateTransaction(request); assertThrowsDaprException( IOException.class, @@ -1440,7 +1442,7 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + }).when(daprStub).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -1455,7 +1457,7 @@ public class DaprClientGrpcTest { .withTransactionalStates(upsertOperation, deleteOperation) .withMetadata(metadata) .build(); - Mono> result = adapter.executeStateTransaction(request); + Mono> result = client.executeStateTransaction(request); result.block(); } @@ -1470,7 +1472,7 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + }).when(daprStub).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -1480,7 +1482,7 @@ public class DaprClientGrpcTest { TransactionalStateOperation.OperationType.DELETE, new State<>("testKey") ); - Mono result = adapter.executeStateTransaction(STATE_STORE_NAME, Arrays.asList(upsertOperation, deleteOperation)); + Mono result = client.executeStateTransaction(STATE_STORE_NAME, Arrays.asList(upsertOperation, deleteOperation)); result.block(); } @@ -1492,13 +1494,13 @@ public class DaprClientGrpcTest { StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + }).when(daprStub).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation operation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, stateKey); - Mono result = adapter.executeStateTransaction(STATE_STORE_NAME, Collections.singletonList(operation)); + Mono result = client.executeStateTransaction(STATE_STORE_NAME, Collections.singletonList(operation)); assertThrowsDaprException( RuntimeException.class, @@ -1518,13 +1520,13 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + }).when(daprStub).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation operation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, stateKey); - Mono result = adapter.executeStateTransaction(STATE_STORE_NAME, Collections.singletonList(operation)); + Mono result = client.executeStateTransaction(STATE_STORE_NAME, Collections.singletonList(operation)); assertThrowsDaprException( ExecutionException.class, @@ -1537,11 +1539,11 @@ public class DaprClientGrpcTest { public void saveStatesIllegalArgumentExceptionTest() { assertThrows(IllegalArgumentException.class, () -> { // empty state store name - adapter.saveBulkState("", Collections.emptyList()).block(); + client.saveBulkState("", Collections.emptyList()).block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty state store name - adapter.saveBulkState(null, Collections.emptyList()).block(); + client.saveBulkState(null, Collections.emptyList()).block(); }); } @@ -1552,9 +1554,9 @@ public class DaprClientGrpcTest { String value = "State value"; doAnswer((Answer) invocation -> { throw new RuntimeException(); - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, null); assertThrowsDaprException( RuntimeException.class, @@ -1573,9 +1575,9 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(ex); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, null); assertThrowsDaprException( ExecutionException.class, @@ -1594,10 +1596,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, null); result.block(); } @@ -1611,10 +1613,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } @@ -1630,10 +1632,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); - adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + client.saveState(STATE_STORE_NAME, key, etag, value, options); // No call to result.block(), so nothing should happen. assertFalse(called.get()); } @@ -1648,10 +1650,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } @@ -1665,10 +1667,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } @@ -1682,10 +1684,10 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } @@ -1735,19 +1737,19 @@ public class DaprClientGrpcTest { observer.onNext(futuresMap.get(key1)); observer.onCompleted(); return null; - }).when(client).getState(argThat(new GetStateRequestKeyMatcher(key1)), any()); + }).when(daprStub).getState(argThat(new GetStateRequestKeyMatcher(key1)), any()); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onNext(futuresMap.get(key2)); observer.onCompleted(); return null; - }).when(client).getState(argThat(new GetStateRequestKeyMatcher(key2)), any()); + }).when(daprStub).getState(argThat(new GetStateRequestKeyMatcher(key2)), any()); State keyRequest1 = buildStateKey(null, key1, etag, null); - Mono> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class); + Mono> resultGet1 = client.getState(STATE_STORE_NAME, keyRequest1, String.class); assertEquals(expectedState1, resultGet1.block()); State keyRequest2 = buildStateKey(null, key2, etag, null); - Mono> resultGet2 = adapter.getState(STATE_STORE_NAME, keyRequest2, String.class); + Mono> resultGet2 = client.getState(STATE_STORE_NAME, keyRequest2, String.class); assertEquals(expectedState2, resultGet2.block()); doAnswer((Answer) invocation -> { @@ -1755,9 +1757,9 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class), any()); + }).when(daprStub).deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class), any()); - Mono resultDelete = adapter.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(), + Mono resultDelete = client.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(), keyRequest2.getOptions()); resultDelete.block(); } @@ -1778,10 +1780,10 @@ public class DaprClientGrpcTest { observer.onNext(futuresMap.get(key1)); observer.onCompleted(); return null; - }).when(client).getState(argThat(new GetStateRequestKeyMatcher(key1)), any()); + }).when(daprStub).getState(argThat(new GetStateRequestKeyMatcher(key1)), any()); State keyRequest1 = buildStateKey(null, key1, null, null); - Mono> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class); + Mono> resultGet1 = client.getState(STATE_STORE_NAME, keyRequest1, String.class); assertEquals(expectedState1, resultGet1.block()); } @@ -1803,9 +1805,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + }).when(daprStub).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); - List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); + List> result = client.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1834,9 +1836,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); + }).when(daprStub).getSecret(any(DaprProtos.GetSecretRequest.class), any()); - Map result = adapter.getSecret(SECRET_STORE_NAME, "key").block(); + Map result = client.getSecret(SECRET_STORE_NAME, "key").block(); assertEquals(1, result.size()); assertEquals(expectedValue, result.get(expectedKey)); @@ -1856,9 +1858,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); + }).when(daprStub).getSecret(any(DaprProtos.GetSecretRequest.class), any()); - Map result = adapter.getSecret(SECRET_STORE_NAME, "key").block(); + Map result = client.getSecret(SECRET_STORE_NAME, "key").block(); assertTrue(result.isEmpty()); } @@ -1874,28 +1876,28 @@ public class DaprClientGrpcTest { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onError(new RuntimeException()); return null; - }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); + }).when(daprStub).getSecret(any(DaprProtos.GetSecretRequest.class), any()); - assertThrowsDaprException(ExecutionException.class, () -> adapter.getSecret(SECRET_STORE_NAME, "key").block()); + assertThrowsDaprException(ExecutionException.class, () -> client.getSecret(SECRET_STORE_NAME, "key").block()); } @Test public void getSecretsIllegalArgumentException() { assertThrows(IllegalArgumentException.class, () -> { // empty secret store name - adapter.getSecret("", "key").block(); + client.getSecret("", "key").block(); }); assertThrows(IllegalArgumentException.class, () -> { // null secret store name - adapter.getSecret(null, "key").block(); + client.getSecret(null, "key").block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty key - adapter.getSecret(SECRET_STORE_NAME, "").block(); + client.getSecret(SECRET_STORE_NAME, "").block(); }); assertThrows(IllegalArgumentException.class, () -> { // null key - adapter.getSecret(SECRET_STORE_NAME, null).block(); + client.getSecret(SECRET_STORE_NAME, null).block(); }); } @@ -1914,9 +1916,9 @@ public class DaprClientGrpcTest { observer.onNext(responseEnvelope); observer.onCompleted(); return null; - }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); + }).when(daprStub).getSecret(any(DaprProtos.GetSecretRequest.class), any()); - Map result = adapter.getSecret( + Map result = client.getSecret( SECRET_STORE_NAME, "key", Collections.singletonMap("metakey", "metavalue")).block(); @@ -1938,11 +1940,11 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); for (StateOptions.Consistency consistency : StateOptions.Consistency.values()) { StateOptions options = buildStateOptions(consistency, StateOptions.Concurrency.FIRST_WRITE); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } } @@ -1960,11 +1962,11 @@ public class DaprClientGrpcTest { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); return null; - }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + }).when(daprStub).saveState(any(DaprProtos.SaveStateRequest.class), any()); for (StateOptions.Concurrency concurrency : StateOptions.Concurrency.values()) { StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, concurrency); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + Mono result = client.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 50f9e5e64..824b5fd29 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -54,7 +54,7 @@ public class DaprClientHttpTest { private final String EXPECTED_RESULT = "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}"; - private DaprClientHttp daprClientHttp; + private DaprClient daprClientHttp; private DaprHttp daprHttp; @@ -66,6 +66,8 @@ public class DaprClientHttpTest { public void setUp() { mockInterceptor = new MockInterceptor(Behavior.UNORDERED); okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); + daprClientHttp = new DaprClientProxy(new DaprClientHttp(daprHttp)); } @Test @@ -114,8 +116,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.publishEvent("mypubsubname","A", event); assertNull(mono.block()); } @@ -123,8 +124,7 @@ public class DaprClientHttpTest { @Test public void publishEventIfTopicIsNullOrEmpty() { String event = "{ \"message\": \"This is a test\" }"; - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.publishEvent("mypubsubname", null, event).block()); assertThrows(IllegalArgumentException.class, () -> @@ -137,8 +137,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.publishEvent("mypubsubname", "", event); // Should not throw exception because did not call block() on mono above. } @@ -149,8 +148,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/publish/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.invokeMethod(null, "", "", null, null, (Class)null).block()); } @@ -160,8 +158,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/badorder") .respond("INVALID JSON"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { // null HttpMethod daprClientHttp.invokeMethod("1", "2", "3", new HttpExtension(null), null, (Class)null).block(); @@ -199,8 +196,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/publish/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block()); } @@ -210,8 +206,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond("\"hello world\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class); assertEquals("hello world", mono.block()); } @@ -221,8 +216,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond(new byte[0]); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class); assertNull(mono.block()); } @@ -232,8 +226,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, byte[].class); assertEquals(new String(mono.block()), EXPECTED_RESULT); } @@ -244,8 +237,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeMethod("41", "neworder", (byte[]) null, HttpExtension.GET, map); String monoString = new String(mono.block()); assertEquals(monoString, EXPECTED_RESULT); @@ -257,8 +249,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeMethod("41", "neworder", HttpExtension.GET, map); assertNull(mono.block()); } @@ -269,8 +260,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map); assertNull(mono.block()); } @@ -281,8 +271,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder?test=1") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Map queryString = new HashMap<>(); queryString.put("test", "1"); HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString, null); @@ -296,8 +285,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") .respond(500); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map); // No exception should be thrown because did not call block() on mono above. } @@ -308,8 +296,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", ""); assertNull(mono.block()); } @@ -320,8 +307,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", null); assertNull(mono.block()); } @@ -331,8 +317,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("NOT VALID JSON"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.invokeBinding(null, "myoperation", "").block(); }); @@ -356,8 +341,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(new byte[0]); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, String.class); assertNull(mono.block()); } @@ -368,8 +352,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"OK\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, String.class); assertEquals("OK", mono.block()); } @@ -380,8 +363,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1.5"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", map, double.class); assertEquals(1.5, mono.block(), 0.0001); } @@ -392,8 +374,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1.5"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, float.class); assertEquals(1.5, mono.block(), 0.0001); } @@ -404,8 +385,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"a\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, char.class); assertEquals('a', (char)mono.block()); } @@ -416,8 +396,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"2\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, byte.class); assertEquals((byte)0x2, (byte)mono.block()); } @@ -428,8 +407,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, long.class); assertEquals(1, (long)mono.block()); } @@ -440,8 +418,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", null, int.class); assertEquals(1, (int)mono.block()); } @@ -452,8 +429,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.invokeBinding(null, "myoperation", "").block()); } @@ -464,8 +440,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.invokeBinding("sample-topic", null, "").block()); } @@ -476,8 +451,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.invokeBinding(null, "", ""); // No exception is thrown because did not call block() on mono above. } @@ -487,8 +461,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk") .respond("NOT VALID JSON"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.getBulkState(STATE_STORE_NAME, null, String.class).block(); }); @@ -519,8 +492,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk") .respond("[{\"key\": \"100\", \"data\": \"hello world\", \"etag\": \"1\"}," + "{\"key\": \"200\", \"error\": \"not found\"}]"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + List> result = daprClientHttp.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); assertEquals(2, result.size()); @@ -540,8 +512,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk") .respond("[{\"key\": \"100\", \"data\": 1234, \"etag\": \"1\"}," + "{\"key\": \"200\", \"error\": \"not found\"}]"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + List> result = daprClientHttp.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block(); assertEquals(2, result.size()); @@ -562,8 +533,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk") .respond("[{\"key\": \"100\", \"data\": true, \"etag\": \"1\"}," + "{\"key\": \"200\", \"error\": \"not found\"}]"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + List> result = daprClientHttp.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block(); assertNotNull(result); @@ -586,8 +556,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk") .respond("[{\"key\": \"100\", \"data\": \"" + base64Value + "\", \"etag\": \"1\"}," + "{\"key\": \"200\", \"error\": \"not found\"}]"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + // JSON cannot differentiate if data returned is String or byte[], it is ambiguous. So we get base64 encoded back. // So, users should use String instead of byte[]. List> result = @@ -611,8 +580,7 @@ public class DaprClientHttpTest { .respond("[{\"key\": \"100\", \"data\": " + "{ \"id\": \"" + object.id + "\", \"value\": \"" + object.value + "\"}, \"etag\": \"1\"}," + "{\"key\": \"200\", \"error\": \"not found\"}]"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + // JSON cannot differentiate if data returned is String or byte[], it is ambiguous. So we get base64 encoded back. // So, users should use String instead of byte[]. List> result = @@ -641,8 +609,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/state/MyStateStore/keyBadPayload") .respond("NOT VALID"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.getState(STATE_STORE_NAME, stateKeyNull, String.class).block(); }); @@ -670,8 +637,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond("\"" + EXPECTED_RESULT + "\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + State monoEmptyEtag = daprClientHttp.getState(STATE_STORE_NAME, stateEmptyEtag, String.class).block(); assertEquals(monoEmptyEtag.getKey(), "key"); assertNull(monoEmptyEtag.getEtag()); @@ -684,8 +650,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key?metadata.key_1=val_1") .respond("\"" + EXPECTED_RESULT + "\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + GetStateRequestBuilder builder = new GetStateRequestBuilder(STATE_STORE_NAME, "key"); builder.withMetadata(metadata); Mono>> monoMetadata = daprClientHttp.getState(builder.build(), TypeRef.get(String.class)); @@ -698,8 +663,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond("\"" + EXPECTED_RESULT + "\""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + State monoNullEtag = daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class).block(); assertEquals(monoNullEtag.getKey(), "key"); assertNull(monoNullEtag.getEtag()); @@ -711,8 +675,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond(500); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class); // No exception should be thrown since did not call block() on mono above. } @@ -724,16 +687,14 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.saveBulkState(STATE_STORE_NAME, stateKeyValueList); assertNull(mono.block()); } @Test public void saveStatesErrors() { - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.saveBulkState(null, null).block()); assertThrows(IllegalArgumentException.class, () -> @@ -743,8 +704,7 @@ public class DaprClientHttpTest { @Test public void saveStatesNull() { List> stateKeyValueList = new ArrayList<>(); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.saveBulkState(STATE_STORE_NAME, null); assertNull(mono.block()); Mono mono1 = daprClientHttp.saveBulkState(STATE_STORE_NAME, stateKeyValueList); @@ -758,8 +718,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono1 = daprClientHttp.saveBulkState(STATE_STORE_NAME, stateKeyValueList); assertNull(mono1.block()); } @@ -771,8 +730,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.saveBulkState(STATE_STORE_NAME, stateKeyValueList); assertNull(mono.block()); } @@ -784,8 +742,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.saveBulkState(STATE_STORE_NAME, stateKeyValueList); assertNull(mono.block()); } @@ -796,8 +753,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); StateOptions stateOptions = mock(StateOptions.class); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions); assertNull(mono.block()); } @@ -808,8 +764,7 @@ public class DaprClientHttpTest { .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") .respond(500); StateOptions stateOptions = mock(StateOptions.class); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions); // No exception should be thrown because we did not call block() on the mono above. } @@ -823,8 +778,7 @@ public class DaprClientHttpTest { String key = "key1"; String data = "my data"; StateOptions stateOptions = mock(StateOptions.class); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + State stateKey = new State<>(data, key, etag, stateOptions); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -847,8 +801,7 @@ public class DaprClientHttpTest { String key = "key1"; String data = "my data"; StateOptions stateOptions = mock(StateOptions.class); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + State stateKey = new State<>(data, key, etag, stateOptions); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -871,8 +824,7 @@ public class DaprClientHttpTest { String key = "key1"; String data = "my data"; StateOptions stateOptions = mock(StateOptions.class); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + State stateKey = new State<>(data, key, etag, stateOptions); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -895,8 +847,7 @@ public class DaprClientHttpTest { String key = "key1"; String data = "my data"; StateOptions stateOptions = mock(StateOptions.class); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + State stateKey = new State<>(data, key, etag, stateOptions); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -918,8 +869,7 @@ public class DaprClientHttpTest { @Test public void executeTransactionErrors() { - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.executeStateTransaction(null, null).block()); assertThrows(IllegalArgumentException.class, () -> @@ -931,8 +881,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/transaction") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.executeStateTransaction(STATE_STORE_NAME, null); assertNull(mono.block()); mono = daprClientHttp.executeStateTransaction(STATE_STORE_NAME, Collections.emptyList()); @@ -946,8 +895,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions); assertNull(mono.block()); } @@ -961,8 +909,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key?metadata.key_1=val_1") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + DeleteStateRequestBuilder builder = new DeleteStateRequestBuilder(STATE_STORE_NAME, stateKeyValue.getKey()); builder.withMetadata(metadata).withEtag(stateKeyValue.getEtag()).withStateOptions(stateOptions); Mono> monoMetadata = daprClientHttp.deleteState(builder.build()); @@ -976,8 +923,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond(500); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions); // No exception should be thrown because we did not call block() on the mono above. } @@ -988,8 +934,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), null); assertNull(mono.block()); } @@ -1000,8 +945,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + Mono mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), null); assertNull(mono.block()); } @@ -1013,8 +957,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.deleteState(STATE_STORE_NAME, null, null, null).block(); }); @@ -1040,8 +983,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key") .respond("{ \"mysecretkey\": \"mysecretvalue\"}"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.getSecret(SECRET_STORE_NAME, null).block(); }); @@ -1056,8 +998,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key%2Fone") .respond("{ \"mysecretkey\": \"mysecretvalue\"}"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.getSecret(SECRET_STORE_NAME, null).block(); }); @@ -1072,8 +1013,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key") .respond(""); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> { daprClientHttp.getSecret(SECRET_STORE_NAME, null).block(); }); @@ -1087,8 +1027,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key") .respond(404); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrowsDaprException("UNKNOWN", () -> daprClientHttp.getSecret(SECRET_STORE_NAME, "key").block() ); @@ -1102,8 +1041,7 @@ public class DaprClientHttpTest { ResponseBody.create("" + "{\"errorCode\":\"ERR_SECRET_STORE_NOT_FOUND\"," + "\"message\":\"error message\"}", MediaTypes.MEDIATYPE_JSON)); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrowsDaprException("ERR_SECRET_STORE_NOT_FOUND", "ERR_SECRET_STORE_NOT_FOUND: error message", () -> daprClientHttp.getSecret(SECRET_STORE_NAME, "key").block() ); @@ -1114,8 +1052,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key") .respond("INVALID JSON"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + assertThrows(IllegalArgumentException.class, () -> daprClientHttp.getSecret(null, "key").block()); assertThrows(IllegalArgumentException.class, () -> @@ -1139,8 +1076,7 @@ public class DaprClientHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key?metadata.metakey=metavalue") .respond("{ \"mysecretkey2\": \"mysecretvalue2\"}"); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); - daprClientHttp = new DaprClientHttp(daprHttp); + { Map secret = daprClientHttp.getSecret( SECRET_STORE_NAME, @@ -1173,7 +1109,7 @@ public class DaprClientHttpTest { } @Test - public void close() { + public void close() throws Exception { DaprHttp daprHttp = Mockito.mock(DaprHttp.class); Mockito.doNothing().when(daprHttp).close(); diff --git a/sdk/src/test/java/io/dapr/client/DaprClientProxyTest.java b/sdk/src/test/java/io/dapr/client/DaprClientProxyTest.java new file mode 100644 index 000000000..6b7766f43 --- /dev/null +++ b/sdk/src/test/java/io/dapr/client/DaprClientProxyTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ +package io.dapr.client; + +import io.dapr.client.domain.HttpExtension; +import org.junit.Test; +import org.mockito.Mockito; +import reactor.core.publisher.Mono; + +import static org.mockito.Mockito.times; + +public class DaprClientProxyTest { + + @Test + public void stateAPI() { + DaprClient client1 = Mockito.mock(DaprClient.class); + DaprClient client2 = Mockito.mock(DaprClient.class); + + Mockito.when(client1.saveState("state", "key", "value")).thenReturn(Mono.empty()); + Mockito.when(client2.saveState("state", "key", "value")).thenReturn(Mono.empty()); + + DaprClient proxy = new DaprClientProxy(client1, client2); + proxy.saveState("state", "key", "value").block(); + + Mockito.verify(client1, times(1)).saveState("state", "key", "value"); + Mockito.verify(client2, times(0)).saveState("state", "key", "value"); + } + + @Test + public void methodInvocationAPI() { + DaprClient client1 = Mockito.mock(DaprClient.class); + DaprClient client2 = Mockito.mock(DaprClient.class); + + Mockito.when(client1.invokeMethod("appId", "methodName", "body", HttpExtension.POST)) + .thenReturn(Mono.empty()); + Mockito.when(client2.invokeMethod("appId", "methodName", "body", HttpExtension.POST)) + .thenReturn(Mono.empty()); + + DaprClient proxy = new DaprClientProxy(client1, client2); + proxy.invokeMethod("appId", "methodName", "body", HttpExtension.POST).block(); + + Mockito.verify(client1, times(0)) + .invokeMethod("appId", "methodName", "body", HttpExtension.POST); + Mockito.verify(client2, times(1)) + .invokeMethod("appId", "methodName", "body", HttpExtension.POST); + } + + @Test + public void closeAllClients() throws Exception { + DaprClient client1 = Mockito.mock(DaprClient.class); + DaprClient client2 = Mockito.mock(DaprClient.class); + + DaprClient proxy = new DaprClientProxy(client1, client2); + proxy.close(); + + Mockito.verify(client1, times(1)).close(); + Mockito.verify(client2, times(1)).close(); + } + + @Test + public void closeSingleClient() throws Exception { + DaprClient client1 = Mockito.mock(DaprClient.class); + + DaprClient proxy = new DaprClientProxy(client1); + proxy.close(); + + Mockito.verify(client1, times(1)).close(); + } + +} \ No newline at end of file