From 8362c6347b076f06c367d6edc23234a12aa8f73c Mon Sep 17 00:00:00 2001 From: Charlie Stanley Date: Thu, 28 May 2020 18:42:01 -0700 Subject: [PATCH] SDK update for breaking changes in dapr core. (#287) * Update proto files for 0.8.0 release (#283) * Bump dapr core to master * Update integration tests for 0.8.0 proto changes * Added unit tests to ensure StateOptions enums always map to their equivalent gRPC enums * Remove some uneed comments/imports * Update pub/sub subscription to match new route/metdata format (#278) * Automatically initialize actor on first invocation (#284) * Update integration tests to publish to more than one pubsub topic/route * Bump dapr CLI version * Remove uneeded interface and exception Co-authored-by: Artur Souza --- .github/workflows/build.yml | 8 +- .../invoke/grpc/HelloWorldService.java | 6 +- .../pubsub/http/SubscriberController.java | 1 - pom.xml | 2 +- .../io/dapr/actors/runtime/ActorManager.java | 9 +- .../io/dapr/actors/runtime/ActorRuntime.java | 24 +- .../dapr/actors/runtime/ActorRuntimeTest.java | 83 +++--- sdk-autogen/pom.xml | 6 +- .../springboot/DaprBeanPostProcessor.java | 27 +- .../io/dapr/springboot/DaprController.java | 12 - .../java/io/dapr/springboot/DaprRuntime.java | 17 +- .../springboot/DaprTopicSubscription.java | 42 +++ .../java/io/dapr/it/pubsub/http/PubSubIT.java | 30 ++- .../it/pubsub/http/SubscriberController.java | 36 ++- .../io/dapr/it/state/HelloWorldClientIT.java | 14 +- .../it/state/HelloWorldGrpcStateService.java | 12 +- sdk/src/main/java/io/dapr/Topic.java | 10 +- .../java/io/dapr/client/DaprClientGrpc.java | 125 +++++---- .../io/dapr/client/DaprClientGrpcTest.java | 254 +++++++++++------- 19 files changed, 479 insertions(+), 239 deletions(-) create mode 100644 sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3bd6b6bc6..dea42fc41 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,10 +22,10 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: 13.0.x - DAPR_RUNTIME_VER: 0.6.0 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/f84566fb2bf5a599252ab9d6bd82fc78faf94dba/install/install.sh - DAPR_CLI_REF: 2e607cb6de0bd69770b244020eecd668fcded19b - DAPR_REF: 7d6e111a181d9d3fdd7537e5c7f6c02764064846 + DAPR_RUNTIME_VER: 0.7.1 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/98f8818260d941c2662908f686395f8480dabd64/install/install.sh + DAPR_CLI_REF: 98f8818260d941c2662908f686395f8480dabd64 + DAPR_REF: 610b92568b1add897ba3e6938a711c0821833966 OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} GPG_KEY: ${{ secrets.GPG_KEY }} diff --git a/examples/src/main/java/io/dapr/examples/invoke/grpc/HelloWorldService.java b/examples/src/main/java/io/dapr/examples/invoke/grpc/HelloWorldService.java index 92c8a206f..11f9e99e8 100644 --- a/examples/src/main/java/io/dapr/examples/invoke/grpc/HelloWorldService.java +++ b/examples/src/main/java/io/dapr/examples/invoke/grpc/HelloWorldService.java @@ -6,9 +6,9 @@ package io.dapr.examples.invoke.grpc; import com.google.protobuf.Any; +import io.dapr.v1.AppCallbackGrpc; import io.dapr.v1.CommonProtos; -import io.dapr.v1.DaprClientGrpc; -import io.dapr.v1.DaprClientProtos; +import io.dapr.v1.DaprAppCallbackProtos; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; @@ -40,7 +40,7 @@ public class HelloWorldService { /** * Server mode: class that encapsulates all server-side logic for Grpc. */ - private static class GrpcHelloWorldDaprService extends DaprClientGrpc.DaprClientImplBase { + private static class GrpcHelloWorldDaprService extends AppCallbackGrpc.AppCallbackImplBase { /** * Format to output date and time. diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java index 8106ab18f..340965c4a 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java @@ -7,7 +7,6 @@ package io.dapr.examples.pubsub.http; import io.dapr.Topic; import io.dapr.client.domain.CloudEvent; -import io.dapr.serializer.DefaultObjectSerializer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; diff --git a/pom.xml b/pom.xml index c3b9657c5..c10760094 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1.25.0 3.11.0 3.10.0 - https://raw.githubusercontent.com/dapr/dapr/89aab21ee86d2f65066bbe530809ee73ac75e921/dapr/proto + https://raw.githubusercontent.com/dapr/dapr/610b92568b1add897ba3e6938a711c0821833966/dapr/proto 1.6.2 3.1.1 1.8 diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java index e2495856a..e0a2ec675 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java @@ -58,8 +58,13 @@ class ActorManager { * @return Asynchronous void response. */ Mono activateActor(ActorId actorId) { - return Mono.fromSupplier(() -> this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId)) - .flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor))); + return Mono.fromSupplier(() -> { + if (this.activeActors.containsKey(actorId)) { + return null; + } + + return this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId); + }).flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor))); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index 892567266..9404efe36 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -198,18 +198,6 @@ public class ActorRuntime { this.config.addRegisteredActorType(actorTypeInfo.getName()); } - /** - * Activates an actor for an actor type with given actor id. - * - * @param actorTypeName Actor type name to activate the actor for. - * @param actorId Actor id for the actor to be activated. - * @return Async void task. - */ - public Mono activate(String actorTypeName, String actorId) { - return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) - .flatMap(m -> m.activateActor(new ActorId(actorId))); - } - /** * Deactivates an actor for an actor type with given actor id. * @@ -233,8 +221,10 @@ public class ActorRuntime { * @return Response for the actor method. */ public Mono invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) { + ActorId id = new ActorId(actorId); return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) - .flatMap(m -> m.invokeMethod(new ActorId(actorId), actorMethodName, payload)); + .flatMap(m -> m.activateActor(id).thenReturn(m)) + .flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload)); } /** @@ -247,8 +237,10 @@ public class ActorRuntime { * @return Async void task. */ public Mono invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) { + ActorId id = new ActorId(actorId); return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) - .flatMap(m -> m.invokeReminder(new ActorId(actorId), reminderName, params)); + .flatMap(m -> m.activateActor(id).thenReturn(m)) + .flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params)); } /** @@ -260,8 +252,10 @@ public class ActorRuntime { * @return Async void task. */ public Mono invokeTimer(String actorTypeName, String actorId, String timerName) { + ActorId id = new ActorId(actorId); return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) - .flatMap(m -> m.invokeTimer(new ActorId(actorId), timerName)); + .flatMap(m -> m.activateActor(id).thenReturn(m)) + .flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName)); } /** diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java index 34b4605a8..986cac54a 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java @@ -25,18 +25,60 @@ public class ActorRuntimeTest { public interface MyActor { String say(); + int count(); } @ActorType(name = ACTOR_NAME) public static class MyActorImpl extends AbstractActor implements MyActor { + private int count = 0; + + private Boolean activated; + public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { super(runtimeContext, id); } + public Mono onActivate() { + return Mono.fromRunnable(() -> { + if (this.activated != null) { + throw new IllegalStateException("already activated once"); + } + + this.activated = true; + }); + } + + public Mono onDeactivate() { + return Mono.fromRunnable(() -> { + if (this.activated == null) { + throw new IllegalStateException("never activated"); + } + + if (this.activated == false) { + throw new IllegalStateException("already deactivated"); + } + + if (this.count == 0) { + throw new IllegalStateException("test expects a call before deactivate"); + } + + this.activated = false; + }); + } + public String say() { + if (!this.activated) { + throw new IllegalStateException("not activated"); + } + + this.count++; return "Nothing to say."; } + + public int count() { + return this.count; + } } private static final ActorObjectSerializer ACTOR_STATE_SERIALIZER = new ActorObjectSerializer(); @@ -71,32 +113,17 @@ public class ActorRuntimeTest { Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME)); } - @Test - public void activateActor() throws Exception { - String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class); - this.runtime.activate(ACTOR_NAME, actorId).block(); - } @Test public void invokeActor() throws Exception { String actorId = UUID.randomUUID().toString(); this.runtime.registerActor(MyActorImpl.class); - this.runtime.activate(ACTOR_NAME, actorId).block(); byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block(); String message = ACTOR_STATE_SERIALIZER.deserialize(response, String.class); Assert.assertEquals("Nothing to say.", message); } - @Test - public void activateThendeactivateActor() throws Exception { - String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class); - this.runtime.activate(ACTOR_NAME, actorId).block(); - this.runtime.deactivate(ACTOR_NAME, actorId).block(); - } - @Test public void deactivateActor() throws Exception { String actorId = UUID.randomUUID().toString(); @@ -104,30 +131,16 @@ public class ActorRuntimeTest { this.runtime.deactivate(ACTOR_NAME, actorId).block(); } - @Test - public void lazyActivate() throws Exception { - String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class); - this.runtime.activate(ACTOR_NAME, actorId).block(); - - this.runtime.invoke(ACTOR_NAME, actorId, "say", null) - .doOnError(e -> Assert.assertTrue(e.getMessage().contains("Could not find actor"))) - .doOnSuccess(s -> Assert.fail()) - .onErrorReturn("".getBytes()) - .block(); - } - @Test public void lazyDeactivate() throws Exception { String actorId = UUID.randomUUID().toString(); this.runtime.registerActor(MyActorImpl.class); - this.runtime.activate(ACTOR_NAME, actorId).block(); - Mono deacticateCall = this.runtime.deactivate(ACTOR_NAME, actorId); + Mono deactivateCall = this.runtime.deactivate(ACTOR_NAME, actorId); this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block(); - deacticateCall.block(); + deactivateCall.block(); this.runtime.invoke(ACTOR_NAME, actorId, "say", null) .doOnError(e -> Assert.assertTrue(e.getMessage().contains("Could not find actor"))) @@ -143,9 +156,15 @@ public class ActorRuntimeTest { Mono invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null); - this.runtime.activate(ACTOR_NAME, actorId).block(); + byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block(); + int count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class); + Assert.assertEquals(0, count); invokeCall.block(); + + response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block(); + count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class); + Assert.assertEquals(1, count); } } diff --git a/sdk-autogen/pom.xml b/sdk-autogen/pom.xml index e09e86fa5..759611fae 100644 --- a/sdk-autogen/pom.xml +++ b/sdk-autogen/pom.xml @@ -88,7 +88,7 @@ wget - ${dapr.proto.baseurl}/dapr/v1/dapr.proto + ${dapr.proto.baseurl}/runtime/v1/dapr.proto dapr.proto ${protobuf.input.directory} @@ -102,8 +102,8 @@ wget - ${dapr.proto.baseurl}/daprclient/v1/daprclient.proto - daprclient.proto + ${dapr.proto.baseurl}/runtime/v1/appcallback.proto + appcallback.proto ${protobuf.input.directory} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java index 4821eeb54..2d46ca2a0 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java @@ -5,12 +5,21 @@ package io.dapr.springboot; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.Topic; +import io.dapr.client.ObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.PostMapping; import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; /** * Handles Dapr annotations in Springboot Controllers. @@ -18,6 +27,8 @@ import java.lang.reflect.Method; @Component public class DaprBeanPostProcessor implements BeanPostProcessor { + private static final ObjectMapper MAPPER = new ObjectMapper(); + /** * {@inheritDoc} */ @@ -56,9 +67,23 @@ public class DaprBeanPostProcessor implements BeanPostProcessor { continue; } + String route = topic.name(); + PostMapping mapping = method.getAnnotation(PostMapping.class); + + if (mapping != null && mapping.path() != null && mapping.path().length >= 1) { + route = mapping.path()[0]; + } + String topicName = topic.name(); if ((topicName != null) && (topicName.length() > 0)) { - DaprRuntime.getInstance().addSubscribedTopic(topicName); + try { + TypeReference> typeRef + = new TypeReference>() {}; + Map metadata = MAPPER.readValue(topic.metadata(), typeRef); + DaprRuntime.getInstance().addSubscribedTopic(topicName, route, metadata); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Error while parsing metadata: " + e.toString()); + } } } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java index 96e6925f5..0b8ebbb8c 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java @@ -56,18 +56,6 @@ public class DaprController { return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics()); } - /** - * Handles API to activate an actor. - * @param type Actor type. - * @param id Actor Id. - * @return Void. - */ - @PostMapping(path = "/actors/{type}/{id}") - public Mono activateActor(@PathVariable("type") String type, - @PathVariable("id") String id) { - return ActorRuntime.getInstance().activate(type, id); - } - /** * Handles API to deactivate an actor. * @param type Actor type. diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java index 08766c887..38cebf0cf 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java @@ -5,7 +5,10 @@ package io.dapr.springboot; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -23,6 +26,11 @@ class DaprRuntime { */ private final Set subscribedTopics = new HashSet<>(); + /** + * List of subscriptions. + */ + private final List subscriptions = new ArrayList<>(); + /** * Private constructor to make this singleton. */ @@ -50,14 +58,17 @@ class DaprRuntime { * Adds a topic to the list of subscribed topics. * * @param topicName Name of the topic being subscribed to. + * @param route Destination route for requests. + * @param metadata Metadata for extended subscription functionality. */ - public synchronized void addSubscribedTopic(String topicName) { + public synchronized void addSubscribedTopic(String topicName, String route, Map metadata) { if (!this.subscribedTopics.contains(topicName)) { this.subscribedTopics.add(topicName); + this.subscriptions.add(new DaprTopicSubscription(topicName, route, metadata)); } } - public synchronized String[] listSubscribedTopics() { - return this.subscribedTopics.toArray(new String[0]); + public synchronized DaprTopicSubscription[] listSubscribedTopics() { + return this.subscriptions.toArray(new DaprTopicSubscription[0]); } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java new file mode 100644 index 000000000..3db3022c1 --- /dev/null +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.springboot; + +import java.util.Collections; +import java.util.Map; + +/** + * Class to represent a subscription topic along with its metadata. + */ +public class DaprTopicSubscription { + private final String topic; + private final String route; + private final Map metadata; + + /** + * Create a subscription topic. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param metadata Metdata for extended subscription functionality. + */ + public DaprTopicSubscription(String topic, String route, Map metadata) { + this.topic = topic; + this.route = route; + this.metadata = Collections.unmodifiableMap(metadata); + } + + public String getTopic() { + return topic; + } + + public String getRoute() { + return route; + } + + public Map getMetadata() { + return metadata; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index bdeb442b6..7f9f28d5e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -31,6 +31,7 @@ public class PubSubIT extends BaseIT { //The title of the topic to be used for publishing private static final String TOPIC_NAME = "testingtopic"; + private static final String ANOTHER_TOPIC_NAME = "anothertopic"; /** * Parameters for this test. @@ -62,12 +63,21 @@ public class PubSubIT extends BaseIT { daprRun.switchToHTTP(); } + // Send a batch of messages on one topic DaprClient client = new DaprClientBuilder().build(); for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("This is message #%d", i); + String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME); //Publishing messages client.publishEvent(TOPIC_NAME, message).block(); - System.out.println("Published message: " + message); + System.out.println(String.format("Published message: '%s' to topic '%s'", message, TOPIC_NAME)); + } + + // Send a batch of different messages on the other. + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME); + //Publishing messages + client.publishEvent(ANOTHER_TOPIC_NAME, message).block(); + System.out.println(String.format("Published message: '%s' to topic '%s'", message, ANOTHER_TOPIC_NAME)); } //Publishing a single byte: Example of non-string based content published @@ -80,12 +90,12 @@ public class PubSubIT extends BaseIT { Thread.sleep(3000); callWithRetry(() -> { - System.out.println("Checking results ..."); - final List messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages", null, List.class).block(); + System.out.println("Checking results for topic " + TOPIC_NAME); + final List messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages/testingtopic", null, List.class).block(); assertEquals(11, messages.size()); for (int i = 0; i < NUM_MESSAGES; i++) { - assertTrue(messages.contains(String.format("This is message #%d", i))); + assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME))); } boolean foundByte = false; @@ -97,6 +107,16 @@ public class PubSubIT extends BaseIT { assertTrue(foundByte); }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME); + final List messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages/anothertopic", null, List.class).block(); + assertEquals(10, messages.size()); + + for (int i = 0; i < NUM_MESSAGES; i++) { + assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME))); + } + }, 2000); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java index 16e772a5f..27ec4d630 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java @@ -21,15 +21,21 @@ import java.util.Map; @RestController public class SubscriberController { - private static final List messagesReceived = new ArrayList(); + private static final List messagesReceivedTestingTopic = new ArrayList(); + private static final List messagesReceivedAnotherTopic = new ArrayList(); - @GetMapping(path = "/messages") - public List getMessages() { - return messagesReceived; + @GetMapping(path = "/messages/testingtopic") + public List getMessagesReceivedTestingTopic() { + return messagesReceivedTestingTopic; + } + + @GetMapping(path = "/messages/anothertopic") + public List getMessagesReceivedAnotherTopic() { + return messagesReceivedAnotherTopic; } @Topic(name = "testingtopic") - @PostMapping(path = "/testingtopic") + @PostMapping(path = "/route1") public Mono handleMessage(@RequestBody(required = false) byte[] body, @RequestHeader Map headers) { return Mono.fromRunnable(() -> { @@ -39,7 +45,25 @@ public class SubscriberController { String message = envelope.getData() == null ? "" : envelope.getData(); System.out.println("Subscriber got message: " + message); - messagesReceived.add(envelope.getData()); + messagesReceivedTestingTopic.add(envelope.getData()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "anothertopic") + @PostMapping(path = "/route2") + public Mono handleMessageAnotherTopic(@RequestBody(required = false) byte[] body, + @RequestHeader Map headers) { + return Mono.fromRunnable(() -> { + try { + // Dapr's event is compliant to CloudEvent. + CloudEvent envelope = CloudEvent.deserialize(body); + + String message = envelope.getData() == null ? "" : envelope.getData(); + System.out.println("Subscriber got message: " + message); + messagesReceivedAnotherTopic.add(envelope.getData()); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java index b3cfe62aa..ac4a3bfc4 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java @@ -31,20 +31,20 @@ public class HelloWorldClientIT extends BaseIT { String key = "mykey"; { - DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope + DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest .newBuilder() .setStoreName(STATE_STORE_NAME) .setKey(key) .build(); - DaprProtos.GetStateResponseEnvelope response = client.getState(req); - String value = response.getData().getValue().toStringUtf8(); + DaprProtos.GetStateResponse response = client.getState(req); + String value = response.getData().toStringUtf8(); System.out.println("Got: " + value); Assert.assertEquals("Hello World", value); } // Then, delete it. { - DaprProtos.DeleteStateEnvelope req = DaprProtos.DeleteStateEnvelope + DaprProtos.DeleteStateRequest req = DaprProtos.DeleteStateRequest .newBuilder() .setStoreName(STATE_STORE_NAME) .setKey(key) @@ -54,13 +54,13 @@ public class HelloWorldClientIT extends BaseIT { } { - DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope + DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest .newBuilder() .setStoreName(STATE_STORE_NAME) .setKey(key) .build(); - DaprProtos.GetStateResponseEnvelope response = client.getState(req); - String value = response.getData().getValue().toStringUtf8(); + DaprProtos.GetStateResponse response = client.getState(req); + String value = response.getData().toStringUtf8(); System.out.println("Got: " + value); Assert.assertEquals("", value); } diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java index e63113e08..2da973e47 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java @@ -9,8 +9,8 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc.DaprBlockingStub; -import io.dapr.v1.DaprProtos.SaveStateEnvelope; -import io.dapr.v1.DaprProtos.StateRequest; +import io.dapr.v1.DaprProtos.SaveStateRequest; +import io.dapr.v1.CommonProtos.StateItem; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -37,14 +37,14 @@ public class HelloWorldGrpcStateService { // First, write key-value pair. String value = "Hello World"; - StateRequest req = StateRequest + StateItem req = StateItem .newBuilder() .setKey(key) - .setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build()) + .setValue(ByteString.copyFromUtf8(value)) .build(); - SaveStateEnvelope state = SaveStateEnvelope.newBuilder() + SaveStateRequest state = SaveStateRequest.newBuilder() .setStoreName("statestore") - .addRequests(req) + .addStates(req) .build(); client.saveState(state); System.out.println("Saved!"); diff --git a/sdk/src/main/java/io/dapr/Topic.java b/sdk/src/main/java/io/dapr/Topic.java index 167005238..95bd0c29a 100644 --- a/sdk/src/main/java/io/dapr/Topic.java +++ b/sdk/src/main/java/io/dapr/Topic.java @@ -18,8 +18,16 @@ public @interface Topic { /** * Name of topic to be subscribed to. - * * @return Topic's name. */ String name(); + + /** + * Metadata in the form of a json object. + * { + * "mykey": "myvalue" + * } + * @return metadata object + */ + String metadata() default "{}"; } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 137ba9424..6305faf2e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import static io.dapr.client.domain.StateOptions.RetryPolicy; + /** * An adapter for the GRPC Client. * @@ -66,6 +68,39 @@ public class DaprClientGrpc implements DaprClient { this.stateSerializer = stateSerializer; } + private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { + switch (options.getConsistency()) { + case EVENTUAL: + return CommonProtos.StateOptions.StateConsistency.CONSISTENCY_EVENTUAL; + case STRONG: + return CommonProtos.StateOptions.StateConsistency.CONSISTENCY_STRONG; + default: + throw new IllegalArgumentException("Missing Consistency mapping to gRPC Consistency enum"); + } + } + + private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(StateOptions options) { + switch (options.getConcurrency()) { + case FIRST_WRITE: + return CommonProtos.StateOptions.StateConcurrency.CONCURRENCY_FIRST_WRITE; + case LAST_WRITE: + return CommonProtos.StateOptions.StateConcurrency.CONCURRENCY_LAST_WRITE; + default: + throw new IllegalArgumentException("Missing StateConcurrency mapping to gRPC Concurrency enum"); + } + } + + private CommonProtos.StateRetryPolicy.RetryPattern getGrpcStateRetryPolicy(RetryPolicy policy) { + switch (policy.getPattern()) { + case LINEAR: + return CommonProtos.StateRetryPolicy.RetryPattern.RETRY_LINEAR; + case EXPONENTIAL: + return CommonProtos.StateRetryPolicy.RetryPattern.RETRY_EXPONENTIAL; + default: + throw new IllegalArgumentException("Missing RetryPattern mapping to gRPC retry pattern enum"); + } + } + /** * {@inheritDoc} */ @@ -80,12 +115,9 @@ public class DaprClientGrpc implements DaprClient { @Override public Mono publishEvent(String topic, Object event, Map metadata) { try { - byte[] byteEvent = objectSerializer.serialize(event); - Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteEvent)).build(); // TODO: handle metadata. - - DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.newBuilder() - .setTopic(topic).setData(data).build(); + DaprProtos.PublishEventRequest envelope = DaprProtos.PublishEventRequest.newBuilder() + .setTopic(topic).setData(ByteString.copyFrom(objectSerializer.serialize(event))).build(); return Mono.fromCallable(() -> { ListenableFuture futureEmpty = client.publishEvent(envelope); @@ -188,16 +220,15 @@ public class DaprClientGrpc implements DaprClient { public Mono invokeBinding(String name, Object request, Map metadata) { try { byte[] byteRequest = objectSerializer.serialize(request); - DaprProtos.InvokeBindingEnvelope.Builder builder = DaprProtos.InvokeBindingEnvelope.newBuilder() + DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder() .setName(name); if (byteRequest != null) { - Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build(); - builder.setData(data); + builder.setData(ByteString.copyFrom(byteRequest)); } if (metadata != null) { builder.putAllMetadata(metadata); } - DaprProtos.InvokeBindingEnvelope envelope = builder.build(); + DaprProtos.InvokeBindingRequest envelope = builder.build(); return Mono.fromCallable(() -> { ListenableFuture futureEmpty = client.invokeBinding(envelope); futureEmpty.get(); @@ -237,17 +268,17 @@ public class DaprClientGrpc implements DaprClient { if ((key == null) || (key.trim().isEmpty())) { throw new IllegalArgumentException("Key cannot be null or empty."); } - DaprProtos.GetStateEnvelope.Builder builder = DaprProtos.GetStateEnvelope.newBuilder() + DaprProtos.GetStateRequest.Builder builder = DaprProtos.GetStateRequest.newBuilder() .setStoreName(stateStoreName) .setKey(key); if (options != null && options.getConsistency() != null) { - builder.setConsistency(options.getConsistency().getValue()); + builder.setConsistency(getGrpcStateConsistency(options)); } - DaprProtos.GetStateEnvelope envelope = builder.build(); + DaprProtos.GetStateRequest envelope = builder.build(); return Mono.fromCallable(() -> { - ListenableFuture futureResponse = client.getState(envelope); - DaprProtos.GetStateResponseEnvelope response = null; + ListenableFuture futureResponse = client.getState(envelope); + DaprProtos.GetStateResponse response = null; try { response = futureResponse.get(); } catch (NullPointerException npe) { @@ -261,11 +292,11 @@ public class DaprClientGrpc implements DaprClient { } private State buildStateKeyValue( - DaprProtos.GetStateResponseEnvelope response, + DaprProtos.GetStateResponse response, String requestedKey, StateOptions stateOptions, Class clazz) throws IOException { - ByteString payload = response.getData().getValue(); + ByteString payload = response.getData(); byte[] data = payload == null ? null : payload.toByteArray(); T value = stateSerializer.deserialize(data, clazz); String etag = response.getEtag(); @@ -282,14 +313,14 @@ public class DaprClientGrpc implements DaprClient { if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { throw new IllegalArgumentException("State store name cannot be null or empty."); } - DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder(); + DaprProtos.SaveStateRequest.Builder builder = DaprProtos.SaveStateRequest.newBuilder(); builder.setStoreName(stateStoreName); for (State state : states) { - builder.addRequests(buildStateRequest(state).build()); + builder.addStates(buildStateRequest(state).build()); } - DaprProtos.SaveStateEnvelope envelope = builder.build(); + DaprProtos.SaveStateRequest request = builder.build(); - return Mono.fromCallable(() -> client.saveState(envelope)).flatMap(f -> { + return Mono.fromCallable(() -> client.saveState(request)).flatMap(f -> { try { f.get(); } catch (Exception ex) { @@ -302,10 +333,10 @@ public class DaprClientGrpc implements DaprClient { } } - private DaprProtos.StateRequest.Builder buildStateRequest(State state) throws IOException { + private CommonProtos.StateItem.Builder buildStateRequest(State state) throws IOException { byte[] bytes = stateSerializer.serialize(state.getValue()); - Any data = Any.newBuilder().setValue(ByteString.copyFrom(bytes)).build(); - DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder(); + ByteString data = ByteString.copyFrom(bytes); + CommonProtos.StateItem.Builder stateBuilder = CommonProtos.StateItem.newBuilder(); if (state.getEtag() != null) { stateBuilder.setEtag(state.getEtag()); } @@ -313,13 +344,13 @@ public class DaprClientGrpc implements DaprClient { stateBuilder.setValue(data); } stateBuilder.setKey(state.getKey()); - DaprProtos.StateOptions.Builder optionBuilder = null; + CommonProtos.StateOptions.Builder optionBuilder = null; if (state.getOptions() != null) { StateOptions options = state.getOptions(); - DaprProtos.RetryPolicy.Builder retryPolicyBuilder = null; + CommonProtos.StateRetryPolicy.Builder retryPolicyBuilder = null; if (options.getRetryPolicy() != null) { - retryPolicyBuilder = DaprProtos.RetryPolicy.newBuilder(); - StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy(); + retryPolicyBuilder = CommonProtos.StateRetryPolicy.newBuilder(); + RetryPolicy retryPolicy = options.getRetryPolicy(); if (options.getRetryPolicy().getInterval() != null) { Duration.Builder durationBuilder = Duration.newBuilder() .setNanos(retryPolicy.getInterval().getNano()) @@ -330,16 +361,16 @@ public class DaprClientGrpc implements DaprClient { retryPolicyBuilder.setThreshold(retryPolicy.getThreshold()); } if (retryPolicy.getPattern() != null) { - retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue()); + retryPolicyBuilder.setPattern(getGrpcStateRetryPolicy(retryPolicy)); } } - optionBuilder = DaprProtos.StateOptions.newBuilder(); + optionBuilder = CommonProtos.StateOptions.newBuilder(); if (options.getConcurrency() != null) { - optionBuilder.setConcurrency(options.getConcurrency().getValue()); + optionBuilder.setConcurrency(getGrpcStateConcurrency(options)); } if (options.getConsistency() != null) { - optionBuilder.setConsistency(options.getConsistency().getValue()); + optionBuilder.setConsistency(getGrpcStateConsistency(options)); } if (retryPolicyBuilder != null) { optionBuilder.setRetryPolicy(retryPolicyBuilder.build()); @@ -389,13 +420,13 @@ public class DaprClientGrpc implements DaprClient { throw new IllegalArgumentException("Key cannot be null or empty."); } - DaprProtos.StateOptions.Builder optionBuilder = null; + CommonProtos.StateOptions.Builder optionBuilder = null; if (options != null) { - optionBuilder = DaprProtos.StateOptions.newBuilder(); - DaprProtos.RetryPolicy.Builder retryPolicyBuilder = null; + optionBuilder = CommonProtos.StateOptions.newBuilder(); + CommonProtos.StateRetryPolicy.Builder retryPolicyBuilder = null; if (options.getRetryPolicy() != null) { - retryPolicyBuilder = DaprProtos.RetryPolicy.newBuilder(); - StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy(); + retryPolicyBuilder = CommonProtos.StateRetryPolicy.newBuilder(); + RetryPolicy retryPolicy = options.getRetryPolicy(); if (options.getRetryPolicy().getInterval() != null) { Duration.Builder durationBuilder = Duration.newBuilder() .setNanos(retryPolicy.getInterval().getNano()) @@ -406,22 +437,22 @@ public class DaprClientGrpc implements DaprClient { retryPolicyBuilder.setThreshold(retryPolicy.getThreshold()); } if (retryPolicy.getPattern() != null) { - retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue()); + retryPolicyBuilder.setPattern(getGrpcStateRetryPolicy(retryPolicy)); } } - optionBuilder = DaprProtos.StateOptions.newBuilder(); + optionBuilder = CommonProtos.StateOptions.newBuilder(); if (options.getConcurrency() != null) { - optionBuilder.setConcurrency(options.getConcurrency().getValue()); + optionBuilder.setConcurrency(getGrpcStateConcurrency(options)); } if (options.getConsistency() != null) { - optionBuilder.setConsistency(options.getConsistency().getValue()); + optionBuilder.setConsistency(getGrpcStateConsistency(options)); } if (retryPolicyBuilder != null) { optionBuilder.setRetryPolicy(retryPolicyBuilder.build()); } } - DaprProtos.DeleteStateEnvelope.Builder builder = DaprProtos.DeleteStateEnvelope.newBuilder() + DaprProtos.DeleteStateRequest.Builder builder = DaprProtos.DeleteStateRequest.newBuilder() .setStoreName(stateStoreName) .setKey(key); if (etag != null) { @@ -432,8 +463,8 @@ public class DaprClientGrpc implements DaprClient { builder.setOptions(optionBuilder.build()); } - DaprProtos.DeleteStateEnvelope envelope = builder.build(); - return Mono.fromCallable(() -> client.deleteState(envelope)).flatMap(f -> { + DaprProtos.DeleteStateRequest request = builder.build(); + return Mono.fromCallable(() -> client.deleteState(request)).flatMap(f -> { try { f.get(); } catch (Exception ex) { @@ -499,16 +530,16 @@ public class DaprClientGrpc implements DaprClient { return Mono.error(e); } - DaprProtos.GetSecretEnvelope.Builder envelopeBuilder = DaprProtos.GetSecretEnvelope.newBuilder() + DaprProtos.GetSecretRequest.Builder requestBuilder = DaprProtos.GetSecretRequest.newBuilder() .setStoreName(secretStoreName) .setKey(secretName); if (metadata != null) { - envelopeBuilder.putAllMetadata(metadata); + requestBuilder.putAllMetadata(metadata); } return Mono.fromCallable(() -> { - DaprProtos.GetSecretEnvelope envelope = envelopeBuilder.build(); - ListenableFuture future = client.getSecret(envelope); + DaprProtos.GetSecretRequest request = requestBuilder.build(); + ListenableFuture future = client.getSecret(request); return future.get(); }).map(future -> future.getDataMap()); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 978a08a66..f9960405c 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -55,7 +55,7 @@ public class DaprClientGrpcTest { @Test(expected = RuntimeException.class) public void publishEventExceptionThrownTest() { - when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class))) + when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenThrow(RuntimeException.class); Mono result = adapter.publishEvent("topic", "object"); result.block(); @@ -67,7 +67,7 @@ public class DaprClientGrpcTest { RuntimeException ex = new RuntimeException("An Exception"); MockCallback callback = new MockCallback(ex); addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class))) + when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenReturn(settableFuture); Mono result = adapter.publishEvent("topic", "object"); settableFuture.setException(ex); @@ -79,7 +79,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class))) + when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenReturn(settableFuture); Mono result = adapter.publishEvent("topic", "object"); settableFuture.set(Empty.newBuilder().build()); @@ -92,7 +92,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class))) + when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenAnswer(c -> { settableFuture.set(Empty.newBuilder().build()); return settableFuture; @@ -107,7 +107,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class))) + when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenReturn(settableFuture); MyObject event = new MyObject(1, "Event"); Mono result = adapter.publishEvent("topic", event); @@ -132,7 +132,7 @@ public class DaprClientGrpcTest { new MockCallback(ex); addCallback(settableFuture, callback, directExecutor()); settableFuture.setException(ex); - when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class))) + when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) .thenReturn(settableFuture); Mono result = adapter.invokeBinding("BindingName", "request"); result.block(); @@ -143,7 +143,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class))) + when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) .thenReturn(settableFuture); Mono result = adapter.invokeBinding("BindingName", "request"); settableFuture.set(Empty.newBuilder().build()); @@ -156,7 +156,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class))) + when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) .thenReturn(settableFuture); MyObject event = new MyObject(1, "Event"); Mono result = adapter.invokeBinding("BindingName", event); @@ -170,7 +170,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class))) + when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) .thenAnswer(c -> { settableFuture.set(Empty.newBuilder().build()); return settableFuture; @@ -488,7 +488,7 @@ public class DaprClientGrpcTest { @Test(expected = RuntimeException.class) public void getStateExceptionThrownTest() { - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))).thenThrow(RuntimeException.class); + when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))).thenThrow(RuntimeException.class); State key = buildStateKey(null, "Key1", "ETag1", null); Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); result.block(); @@ -496,12 +496,12 @@ public class DaprClientGrpcTest { @Test(expected = RuntimeException.class) public void getStateCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); + SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = + MockCallback callback = new MockCallback<>(ex); addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))) + when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) .thenReturn(settableFuture); State key = buildStateKey(null, "Key1", "ETag1", null); Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); @@ -515,11 +515,11 @@ public class DaprClientGrpcTest { String key = "key1"; String expectedValue = "Expected state"; State expectedState = buildStateKey(expectedValue, key, etag, null); - DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + DaprProtos.GetStateResponse responseEnvelope = buildGetStateResponse(expectedValue, etag); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))) + when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) .thenReturn(settableFuture); State keyRequest = buildStateKey(null, key, etag, null); Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class); @@ -533,11 +533,11 @@ public class DaprClientGrpcTest { String key = "key1"; String expectedValue = "Expected state"; State expectedState = buildStateKey(expectedValue, key, etag, null); - DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + DaprProtos.GetStateResponse responseEnvelope = buildGetStateResponse(expectedValue, etag); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))) + when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) .thenAnswer(c -> { settableFuture.set(responseEnvelope); return settableFuture; @@ -556,15 +556,15 @@ public class DaprClientGrpcTest { StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); State expectedState = buildStateKey(expectedValue, key, etag, options); - DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() - .setData(getAny(expectedValue)) + DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder() + .setData(getBytes(expectedValue)) .setEtag(etag) .build(); State keyRequest = buildStateKey(null, key, etag, options); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))) + when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) .thenReturn(settableFuture); Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); settableFuture.set(responseEnvelope); @@ -579,15 +579,15 @@ public class DaprClientGrpcTest { StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE, new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR)); State expectedState = buildStateKey(expectedValue, key, etag, options); - DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() - .setData(getAny(expectedValue)) + DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder() + .setData(getBytes(expectedValue)) .setEtag(etag) .build(); State keyRequest = buildStateKey(null, key, etag, options); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))) + when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) .thenReturn(settableFuture); Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); settableFuture.set(responseEnvelope); @@ -596,7 +596,7 @@ public class DaprClientGrpcTest { @Test(expected = RuntimeException.class) public void deleteStateExceptionThrowTest() { - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))).thenThrow(RuntimeException.class); + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))).thenThrow(RuntimeException.class); State key = buildStateKey(null, "Key1", "ETag1", null); Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); result.block(); @@ -609,7 +609,7 @@ public class DaprClientGrpcTest { MockCallback callback = new MockCallback(ex); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State key = buildStateKey(null, "Key1", "ETag1", null); Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); @@ -624,7 +624,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, null); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -643,7 +643,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -662,7 +662,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenAnswer(c -> { settableFuture.set(Empty.newBuilder().build()); return settableFuture; @@ -683,7 +683,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -702,7 +702,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -721,7 +721,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -740,7 +740,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -759,7 +759,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -778,7 +778,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), @@ -793,7 +793,7 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenThrow(RuntimeException.class); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenThrow(RuntimeException.class); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); result.block(); } @@ -807,7 +807,7 @@ public class DaprClientGrpcTest { RuntimeException ex = new RuntimeException("An Exception"); MockCallback callback = new MockCallback<>(ex); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); settableFuture.setException(ex); result.block(); @@ -821,7 +821,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -836,7 +836,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -853,7 +853,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenAnswer(c -> { + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenAnswer(c -> { settableFuture.set(Empty.newBuilder().build()); return settableFuture; }); @@ -872,7 +872,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -889,7 +889,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -906,7 +906,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null, null, null); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -923,7 +923,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null, 1, StateOptions.RetryPolicy.Pattern.LINEAR); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -940,7 +940,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -957,7 +957,7 @@ public class DaprClientGrpcTest { SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, null); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); @@ -998,10 +998,10 @@ public class DaprClientGrpcTest { String key2 = "key2"; String expectedValue2 = "Expected state 2"; State expectedState1 = buildStateKey(expectedValue1, key1, etag, null); - Map> futuresMap = new HashMap<>(); + Map> futuresMap = new HashMap<>(); futuresMap.put(key1, buildFutureGetStateEnvelop(expectedValue1, etag)); futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag)); - when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key1)))).thenReturn(futuresMap.get(key1)); + when(client.getState(argThat(new GetStateRequestKeyMatcher(key1)))).thenReturn(futuresMap.get(key1)); State keyRequest1 = buildStateKey(null, key1, etag, null); Mono> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class); assertEquals(expectedState1, resultGet1.block()); @@ -1011,7 +1011,7 @@ public class DaprClientGrpcTest { SettableFuture settableFutureDelete = SettableFuture.create(); MockCallback callbackDelete = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFutureDelete, callbackDelete, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))) + when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) .thenReturn(settableFutureDelete); Mono resultDelete = adapter.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(), keyRequest2.getOptions()); @@ -1019,7 +1019,7 @@ public class DaprClientGrpcTest { resultDelete.block(); assertTrue(callbackDelete.wasCalled); futuresMap.replace(key2, null); - when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key2)))).thenReturn(futuresMap.get(key2)); + when(client.getState(argThat(new GetStateRequestKeyMatcher(key2)))).thenReturn(futuresMap.get(key2)); State state2 = resultGet2.block(); assertNull(state2); @@ -1029,15 +1029,15 @@ public class DaprClientGrpcTest { public void getSecrets() { String expectedKey = "attributeKey"; String expectedValue = "Expected secret value"; - DaprProtos.GetSecretResponseEnvelope responseEnvelope = buildGetSecretResponseEnvelope(expectedKey, expectedValue); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(expectedKey, expectedValue); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); settableFuture.set(responseEnvelope); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class))) + when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0); + io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); assertEquals("key", req.getKey()); assertEquals(SECRET_STORE_NAME, req.getStoreName()); assertEquals(0, req.getMetadataCount()); @@ -1052,15 +1052,15 @@ public class DaprClientGrpcTest { @Test public void getSecretsEmptyResponse() { - DaprProtos.GetSecretResponseEnvelope responseEnvelope = buildGetSecretResponseEnvelope(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); settableFuture.set(responseEnvelope); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class))) + when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0); + io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); assertEquals("key", req.getKey()); assertEquals(SECRET_STORE_NAME, req.getStoreName()); assertEquals(0, req.getMetadataCount()); @@ -1074,14 +1074,14 @@ public class DaprClientGrpcTest { @Test public void getSecretsException() { - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(new RuntimeException()); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(new RuntimeException()); addCallback(settableFuture, callback, directExecutor()); settableFuture.setException(new RuntimeException()); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class))) + when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0); + io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); assertEquals("key", req.getKey()); assertEquals(SECRET_STORE_NAME, req.getStoreName()); assertEquals(0, req.getMetadataCount()); @@ -1097,15 +1097,15 @@ public class DaprClientGrpcTest { public void getSecretsWithMetadata() { String expectedKey = "attributeKey"; String expectedValue = "Expected secret value"; - DaprProtos.GetSecretResponseEnvelope responseEnvelope = buildGetSecretResponseEnvelope(expectedKey, expectedValue); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); + DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(expectedKey, expectedValue); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); settableFuture.set(responseEnvelope); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class))) + when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0); + io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); assertEquals("key", req.getKey()); assertEquals(SECRET_STORE_NAME, req.getStoreName()); assertEquals("metavalue", req.getMetadataMap().get("metakey")); @@ -1121,31 +1121,100 @@ public class DaprClientGrpcTest { assertEquals(expectedValue, result.get(expectedKey)); } - private SettableFuture buildFutureGetStateEnvelop(T value, String etag) throws IOException { - DaprProtos.GetStateResponseEnvelope envelope = buildGetStateResponseEnvelope(value, etag); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(envelope); + /* If this test is failing, it means that a new value was added to StateOptions.Consistency + * enum, without creating a mapping to one of the proto defined gRPC enums + */ + @Test + public void stateOptionsConsistencyValuesHaveValidGrpcEnumMappings() { + String key = "key1"; + String etag = "ETag1"; + String value = "State value"; + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + settableFuture.set(Empty.newBuilder().build()); + for (StateOptions.Consistency consistency : StateOptions.Consistency.values()) { + StateOptions options = buildStateOptions(consistency, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + result.block(); + } + + assertTrue(callback.wasCalled); + } + + /* If this test is failing, it means that a new value was added to StateOptions.Concurrency + * enum, without creating a mapping to one of the proto defined gRPC enums + */ + @Test + public void stateOptionsConcurrencyValuesHaveValidGrpcEnumMappings() { + String key = "key1"; + String etag = "ETag1"; + String value = "State value"; + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + settableFuture.set(Empty.newBuilder().build()); + for (StateOptions.Concurrency concurrency : StateOptions.Concurrency.values()) { + StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, concurrency, + Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + result.block(); + } + + assertTrue(callback.wasCalled); + } + + /* If this test is failing, it means that a new value was added to StateOptions.RetryPolicy.Pattern + * enum, without creating a mapping to one of the proto defined gRPC enums + */ + @Test + public void stateOptionsRetryPatternValuesHaveValidGrpcEnumMappings() { + String key = "key1"; + String etag = "ETag1"; + String value = "State value"; + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + settableFuture.set(Empty.newBuilder().build()); + for (StateOptions.RetryPolicy.Pattern retryPattern : StateOptions.RetryPolicy.Pattern.values()) { + StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, + StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), null, retryPattern); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + result.block(); + } + + assertTrue(callback.wasCalled); + } + + private SettableFuture buildFutureGetStateEnvelop(T value, String etag) throws IOException { + DaprProtos.GetStateResponse envelope = buildGetStateResponse(value, etag); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(envelope); addCallback(settableFuture, callback, directExecutor()); settableFuture.set(envelope); return settableFuture; } - private DaprProtos.GetStateResponseEnvelope buildGetStateResponseEnvelope(T value, String etag) throws IOException { - return DaprProtos.GetStateResponseEnvelope.newBuilder() - .setData(getAny(value)) + private DaprProtos.GetStateResponse buildGetStateResponse(T value, String etag) throws IOException { + return DaprProtos.GetStateResponse.newBuilder() + .setData(getBytes(value)) .setEtag(etag) .build(); } - private DaprProtos.GetSecretResponseEnvelope buildGetSecretResponseEnvelope(String key, String value) { - return DaprProtos.GetSecretResponseEnvelope.newBuilder() + private DaprProtos.GetSecretResponse buildGetSecretResponse(String key, String value) { + return DaprProtos.GetSecretResponse.newBuilder() .putAllData(Collections.singletonMap(key, value)) .build(); } - private DaprProtos.GetSecretResponseEnvelope buildGetSecretResponseEnvelope() { - return DaprProtos.GetSecretResponseEnvelope.newBuilder().build(); + private DaprProtos.GetSecretResponse buildGetSecretResponse() { + return DaprProtos.GetSecretResponse.newBuilder().build(); } private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency, @@ -1168,6 +1237,11 @@ public class DaprClientGrpcTest { return Any.newBuilder().setValue(ByteString.copyFrom(byteValue)).build(); } + private ByteString getBytes(T value) throws IOException { + byte[] byteValue = serializer.serialize(value); + return ByteString.copyFrom(byteValue); + } + private final class MockCallback implements FutureCallback { private T value = null; private Throwable failure = null; @@ -1248,16 +1322,16 @@ public class DaprClientGrpcTest { } } - private static class GetStateEnvelopeKeyMatcher implements ArgumentMatcher { + private static class GetStateRequestKeyMatcher implements ArgumentMatcher { private final String propValue; - GetStateEnvelopeKeyMatcher(String propValue) { + GetStateRequestKeyMatcher(String propValue) { this.propValue = propValue; } @Override - public boolean matches(DaprProtos.GetStateEnvelope argument) { + public boolean matches(DaprProtos.GetStateRequest argument) { if (argument == null) { return false; }