SDK update for breaking changes in dapr core. (#287)

* Update proto files for 0.8.0 release (#283)

* Bump dapr core to master

* Update integration tests for 0.8.0 proto changes

* Added unit tests to ensure StateOptions enums always map to their equivalent gRPC enums

* Remove some uneed comments/imports

* Update pub/sub subscription to match new route/metdata format (#278)

* Automatically initialize actor on first invocation (#284)

* Update integration tests to publish to more than one pubsub topic/route

* Bump dapr CLI version

* Remove uneeded interface and exception

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Charlie Stanley 2020-05-28 18:42:01 -07:00 committed by GitHub
parent e0f3f3caaa
commit 8362c6347b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 479 additions and 239 deletions

View File

@ -22,10 +22,10 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: 13.0.x
DAPR_RUNTIME_VER: 0.6.0
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/f84566fb2bf5a599252ab9d6bd82fc78faf94dba/install/install.sh
DAPR_CLI_REF: 2e607cb6de0bd69770b244020eecd668fcded19b
DAPR_REF: 7d6e111a181d9d3fdd7537e5c7f6c02764064846
DAPR_RUNTIME_VER: 0.7.1
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/98f8818260d941c2662908f686395f8480dabd64/install/install.sh
DAPR_CLI_REF: 98f8818260d941c2662908f686395f8480dabd64
DAPR_REF: 610b92568b1add897ba3e6938a711c0821833966
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}

View File

@ -6,9 +6,9 @@
package io.dapr.examples.invoke.grpc;
import com.google.protobuf.Any;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprClientGrpc;
import io.dapr.v1.DaprClientProtos;
import io.dapr.v1.DaprAppCallbackProtos;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
@ -40,7 +40,7 @@ public class HelloWorldService {
/**
* Server mode: class that encapsulates all server-side logic for Grpc.
*/
private static class GrpcHelloWorldDaprService extends DaprClientGrpc.DaprClientImplBase {
private static class GrpcHelloWorldDaprService extends AppCallbackGrpc.AppCallbackImplBase {
/**
* Format to output date and time.

View File

@ -7,7 +7,6 @@ package io.dapr.examples.pubsub.http;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import io.dapr.serializer.DefaultObjectSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

View File

@ -17,7 +17,7 @@
<grpc.version>1.25.0</grpc.version>
<protobuf.version>3.11.0</protobuf.version>
<protoc.version>3.10.0</protoc.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/89aab21ee86d2f65066bbe530809ee73ac75e921/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/610b92568b1add897ba3e6938a711c0821833966/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -58,8 +58,13 @@ class ActorManager<T extends AbstractActor> {
* @return Asynchronous void response.
*/
Mono<Void> activateActor(ActorId actorId) {
return Mono.fromSupplier(() -> this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId))
.flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor)));
return Mono.fromSupplier(() -> {
if (this.activeActors.containsKey(actorId)) {
return null;
}
return this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId);
}).flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor)));
}
/**

View File

@ -198,18 +198,6 @@ public class ActorRuntime {
this.config.addRegisteredActorType(actorTypeInfo.getName());
}
/**
* Activates an actor for an actor type with given actor id.
*
* @param actorTypeName Actor type name to activate the actor for.
* @param actorId Actor id for the actor to be activated.
* @return Async void task.
*/
public Mono<Void> activate(String actorTypeName, String actorId) {
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(new ActorId(actorId)));
}
/**
* Deactivates an actor for an actor type with given actor id.
*
@ -233,8 +221,10 @@ public class ActorRuntime {
* @return Response for the actor method.
*/
public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.invokeMethod(new ActorId(actorId), actorMethodName, payload));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload));
}
/**
@ -247,8 +237,10 @@ public class ActorRuntime {
* @return Async void task.
*/
public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.invokeReminder(new ActorId(actorId), reminderName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params));
}
/**
@ -260,8 +252,10 @@ public class ActorRuntime {
* @return Async void task.
*/
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.invokeTimer(new ActorId(actorId), timerName));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName));
}
/**

View File

@ -25,18 +25,60 @@ public class ActorRuntimeTest {
public interface MyActor {
String say();
int count();
}
@ActorType(name = ACTOR_NAME)
public static class MyActorImpl extends AbstractActor implements MyActor {
private int count = 0;
private Boolean activated;
public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
public Mono<Void> onActivate() {
return Mono.fromRunnable(() -> {
if (this.activated != null) {
throw new IllegalStateException("already activated once");
}
this.activated = true;
});
}
public Mono<Void> onDeactivate() {
return Mono.fromRunnable(() -> {
if (this.activated == null) {
throw new IllegalStateException("never activated");
}
if (this.activated == false) {
throw new IllegalStateException("already deactivated");
}
if (this.count == 0) {
throw new IllegalStateException("test expects a call before deactivate");
}
this.activated = false;
});
}
public String say() {
if (!this.activated) {
throw new IllegalStateException("not activated");
}
this.count++;
return "Nothing to say.";
}
public int count() {
return this.count;
}
}
private static final ActorObjectSerializer ACTOR_STATE_SERIALIZER = new ActorObjectSerializer();
@ -71,32 +113,17 @@ public class ActorRuntimeTest {
Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME));
}
@Test
public void activateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.activate(ACTOR_NAME, actorId).block();
}
@Test
public void invokeActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.activate(ACTOR_NAME, actorId).block();
byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block();
String message = ACTOR_STATE_SERIALIZER.deserialize(response, String.class);
Assert.assertEquals("Nothing to say.", message);
}
@Test
public void activateThendeactivateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.activate(ACTOR_NAME, actorId).block();
this.runtime.deactivate(ACTOR_NAME, actorId).block();
}
@Test
public void deactivateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
@ -104,30 +131,16 @@ public class ActorRuntimeTest {
this.runtime.deactivate(ACTOR_NAME, actorId).block();
}
@Test
public void lazyActivate() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.activate(ACTOR_NAME, actorId).block();
this.runtime.invoke(ACTOR_NAME, actorId, "say", null)
.doOnError(e -> Assert.assertTrue(e.getMessage().contains("Could not find actor")))
.doOnSuccess(s -> Assert.fail())
.onErrorReturn("".getBytes())
.block();
}
@Test
public void lazyDeactivate() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.activate(ACTOR_NAME, actorId).block();
Mono<Void> deacticateCall = this.runtime.deactivate(ACTOR_NAME, actorId);
Mono<Void> deactivateCall = this.runtime.deactivate(ACTOR_NAME, actorId);
this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block();
deacticateCall.block();
deactivateCall.block();
this.runtime.invoke(ACTOR_NAME, actorId, "say", null)
.doOnError(e -> Assert.assertTrue(e.getMessage().contains("Could not find actor")))
@ -143,9 +156,15 @@ public class ActorRuntimeTest {
Mono<byte[]> invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null);
this.runtime.activate(ACTOR_NAME, actorId).block();
byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block();
int count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class);
Assert.assertEquals(0, count);
invokeCall.block();
response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block();
count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class);
Assert.assertEquals(1, count);
}
}

View File

@ -88,7 +88,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>${dapr.proto.baseurl}/dapr/v1/dapr.proto</url>
<url>${dapr.proto.baseurl}/runtime/v1/dapr.proto</url>
<outputFileName>dapr.proto</outputFileName>
<!-- default target location, just to demonstrate the parameter -->
<outputDirectory>${protobuf.input.directory}</outputDirectory>
@ -102,8 +102,8 @@
<goal>wget</goal>
</goals>
<configuration>
<url>${dapr.proto.baseurl}/daprclient/v1/daprclient.proto</url>
<outputFileName>daprclient.proto</outputFileName>
<url>${dapr.proto.baseurl}/runtime/v1/appcallback.proto</url>
<outputFileName>appcallback.proto</outputFileName>
<!-- default target location, just to demonstrate the parameter -->
<outputDirectory>${protobuf.input.directory}</outputDirectory>
</configuration>

View File

@ -5,12 +5,21 @@
package io.dapr.springboot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Topic;
import io.dapr.client.ObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* Handles Dapr annotations in Springboot Controllers.
@ -18,6 +27,8 @@ import java.lang.reflect.Method;
@Component
public class DaprBeanPostProcessor implements BeanPostProcessor {
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* {@inheritDoc}
*/
@ -56,9 +67,23 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
continue;
}
String route = topic.name();
PostMapping mapping = method.getAnnotation(PostMapping.class);
if (mapping != null && mapping.path() != null && mapping.path().length >= 1) {
route = mapping.path()[0];
}
String topicName = topic.name();
if ((topicName != null) && (topicName.length() > 0)) {
DaprRuntime.getInstance().addSubscribedTopic(topicName);
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
DaprRuntime.getInstance().addSubscribedTopic(topicName, route, metadata);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
}
}
}
}

View File

@ -56,18 +56,6 @@ public class DaprController {
return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}
/**
* Handles API to activate an actor.
* @param type Actor type.
* @param id Actor Id.
* @return Void.
*/
@PostMapping(path = "/actors/{type}/{id}")
public Mono<Void> activateActor(@PathVariable("type") String type,
@PathVariable("id") String id) {
return ActorRuntime.getInstance().activate(type, id);
}
/**
* Handles API to deactivate an actor.
* @param type Actor type.

View File

@ -5,7 +5,10 @@
package io.dapr.springboot;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -23,6 +26,11 @@ class DaprRuntime {
*/
private final Set<String> subscribedTopics = new HashSet<>();
/**
* List of subscriptions.
*/
private final List<DaprTopicSubscription> subscriptions = new ArrayList<>();
/**
* Private constructor to make this singleton.
*/
@ -50,14 +58,17 @@ class DaprRuntime {
* Adds a topic to the list of subscribed topics.
*
* @param topicName Name of the topic being subscribed to.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String topicName) {
public synchronized void addSubscribedTopic(String topicName, String route, Map<String,String> metadata) {
if (!this.subscribedTopics.contains(topicName)) {
this.subscribedTopics.add(topicName);
this.subscriptions.add(new DaprTopicSubscription(topicName, route, metadata));
}
}
public synchronized String[] listSubscribedTopics() {
return this.subscribedTopics.toArray(new String[0]);
public synchronized DaprTopicSubscription[] listSubscribedTopics() {
return this.subscriptions.toArray(new DaprTopicSubscription[0]);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.springboot;
import java.util.Collections;
import java.util.Map;
/**
* Class to represent a subscription topic along with its metadata.
*/
public class DaprTopicSubscription {
private final String topic;
private final String route;
private final Map<String, String> metadata;
/**
* Create a subscription topic.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality.
*/
public DaprTopicSubscription(String topic, String route, Map<String, String> metadata) {
this.topic = topic;
this.route = route;
this.metadata = Collections.unmodifiableMap(metadata);
}
public String getTopic() {
return topic;
}
public String getRoute() {
return route;
}
public Map<String, String> getMetadata() {
return metadata;
}
}

View File

@ -31,6 +31,7 @@ public class PubSubIT extends BaseIT {
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
private static final String ANOTHER_TOPIC_NAME = "anothertopic";
/**
* Parameters for this test.
@ -62,12 +63,21 @@ public class PubSubIT extends BaseIT {
daprRun.switchToHTTP();
}
// Send a batch of messages on one topic
DaprClient client = new DaprClientBuilder().build();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME);
//Publishing messages
client.publishEvent(TOPIC_NAME, message).block();
System.out.println("Published message: " + message);
System.out.println(String.format("Published message: '%s' to topic '%s'", message, TOPIC_NAME));
}
// Send a batch of different messages on the other.
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME);
//Publishing messages
client.publishEvent(ANOTHER_TOPIC_NAME, message).block();
System.out.println(String.format("Published message: '%s' to topic '%s'", message, ANOTHER_TOPIC_NAME));
}
//Publishing a single byte: Example of non-string based content published
@ -80,12 +90,12 @@ public class PubSubIT extends BaseIT {
Thread.sleep(3000);
callWithRetry(() -> {
System.out.println("Checking results ...");
final List<String> messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages", null, List.class).block();
System.out.println("Checking results for topic " + TOPIC_NAME);
final List<String> messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages/testingtopic", null, List.class).block();
assertEquals(11, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.contains(String.format("This is message #%d", i)));
assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
}
boolean foundByte = false;
@ -97,6 +107,16 @@ public class PubSubIT extends BaseIT {
assertTrue(foundByte);
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME);
final List<String> messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages/anothertopic", null, List.class).block();
assertEquals(10, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME)));
}
}, 2000);
}
}

View File

@ -21,15 +21,21 @@ import java.util.Map;
@RestController
public class SubscriberController {
private static final List<String> messagesReceived = new ArrayList();
private static final List<String> messagesReceivedTestingTopic = new ArrayList();
private static final List<String> messagesReceivedAnotherTopic = new ArrayList();
@GetMapping(path = "/messages")
public List<String> getMessages() {
return messagesReceived;
@GetMapping(path = "/messages/testingtopic")
public List<String> getMessagesReceivedTestingTopic() {
return messagesReceivedTestingTopic;
}
@GetMapping(path = "/messages/anothertopic")
public List<String> getMessagesReceivedAnotherTopic() {
return messagesReceivedAnotherTopic;
}
@Topic(name = "testingtopic")
@PostMapping(path = "/testingtopic")
@PostMapping(path = "/route1")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
return Mono.fromRunnable(() -> {
@ -39,7 +45,25 @@ public class SubscriberController {
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
messagesReceived.add(envelope.getData());
messagesReceivedTestingTopic.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "anothertopic")
@PostMapping(path = "/route2")
public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
return Mono.fromRunnable(() -> {
try {
// Dapr's event is compliant to CloudEvent.
CloudEvent envelope = CloudEvent.deserialize(body);
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
messagesReceivedAnotherTopic.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -31,20 +31,20 @@ public class HelloWorldClientIT extends BaseIT {
String key = "mykey";
{
DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope
DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest
.newBuilder()
.setStoreName(STATE_STORE_NAME)
.setKey(key)
.build();
DaprProtos.GetStateResponseEnvelope response = client.getState(req);
String value = response.getData().getValue().toStringUtf8();
DaprProtos.GetStateResponse response = client.getState(req);
String value = response.getData().toStringUtf8();
System.out.println("Got: " + value);
Assert.assertEquals("Hello World", value);
}
// Then, delete it.
{
DaprProtos.DeleteStateEnvelope req = DaprProtos.DeleteStateEnvelope
DaprProtos.DeleteStateRequest req = DaprProtos.DeleteStateRequest
.newBuilder()
.setStoreName(STATE_STORE_NAME)
.setKey(key)
@ -54,13 +54,13 @@ public class HelloWorldClientIT extends BaseIT {
}
{
DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope
DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest
.newBuilder()
.setStoreName(STATE_STORE_NAME)
.setKey(key)
.build();
DaprProtos.GetStateResponseEnvelope response = client.getState(req);
String value = response.getData().getValue().toStringUtf8();
DaprProtos.GetStateResponse response = client.getState(req);
String value = response.getData().toStringUtf8();
System.out.println("Got: " + value);
Assert.assertEquals("", value);
}

View File

@ -9,8 +9,8 @@ import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprGrpc.DaprBlockingStub;
import io.dapr.v1.DaprProtos.SaveStateEnvelope;
import io.dapr.v1.DaprProtos.StateRequest;
import io.dapr.v1.DaprProtos.SaveStateRequest;
import io.dapr.v1.CommonProtos.StateItem;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@ -37,14 +37,14 @@ public class HelloWorldGrpcStateService {
// First, write key-value pair.
String value = "Hello World";
StateRequest req = StateRequest
StateItem req = StateItem
.newBuilder()
.setKey(key)
.setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
.setValue(ByteString.copyFromUtf8(value))
.build();
SaveStateEnvelope state = SaveStateEnvelope.newBuilder()
SaveStateRequest state = SaveStateRequest.newBuilder()
.setStoreName("statestore")
.addRequests(req)
.addStates(req)
.build();
client.saveState(state);
System.out.println("Saved!");

View File

@ -18,8 +18,16 @@ public @interface Topic {
/**
* Name of topic to be subscribed to.
*
* @return Topic's name.
*/
String name();
/**
* Metadata in the form of a json object.
* {
* "mykey": "myvalue"
* }
* @return metadata object
*/
String metadata() default "{}";
}

View File

@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static io.dapr.client.domain.StateOptions.RetryPolicy;
/**
* An adapter for the GRPC Client.
*
@ -66,6 +68,39 @@ public class DaprClientGrpc implements DaprClient {
this.stateSerializer = stateSerializer;
}
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
switch (options.getConsistency()) {
case EVENTUAL:
return CommonProtos.StateOptions.StateConsistency.CONSISTENCY_EVENTUAL;
case STRONG:
return CommonProtos.StateOptions.StateConsistency.CONSISTENCY_STRONG;
default:
throw new IllegalArgumentException("Missing Consistency mapping to gRPC Consistency enum");
}
}
private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(StateOptions options) {
switch (options.getConcurrency()) {
case FIRST_WRITE:
return CommonProtos.StateOptions.StateConcurrency.CONCURRENCY_FIRST_WRITE;
case LAST_WRITE:
return CommonProtos.StateOptions.StateConcurrency.CONCURRENCY_LAST_WRITE;
default:
throw new IllegalArgumentException("Missing StateConcurrency mapping to gRPC Concurrency enum");
}
}
private CommonProtos.StateRetryPolicy.RetryPattern getGrpcStateRetryPolicy(RetryPolicy policy) {
switch (policy.getPattern()) {
case LINEAR:
return CommonProtos.StateRetryPolicy.RetryPattern.RETRY_LINEAR;
case EXPONENTIAL:
return CommonProtos.StateRetryPolicy.RetryPattern.RETRY_EXPONENTIAL;
default:
throw new IllegalArgumentException("Missing RetryPattern mapping to gRPC retry pattern enum");
}
}
/**
* {@inheritDoc}
*/
@ -80,12 +115,9 @@ public class DaprClientGrpc implements DaprClient {
@Override
public Mono<Void> publishEvent(String topic, Object event, Map<String, String> metadata) {
try {
byte[] byteEvent = objectSerializer.serialize(event);
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteEvent)).build();
// TODO: handle metadata.
DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.newBuilder()
.setTopic(topic).setData(data).build();
DaprProtos.PublishEventRequest envelope = DaprProtos.PublishEventRequest.newBuilder()
.setTopic(topic).setData(ByteString.copyFrom(objectSerializer.serialize(event))).build();
return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
@ -188,16 +220,15 @@ public class DaprClientGrpc implements DaprClient {
public Mono<Void> invokeBinding(String name, Object request, Map<String, String> metadata) {
try {
byte[] byteRequest = objectSerializer.serialize(request);
DaprProtos.InvokeBindingEnvelope.Builder builder = DaprProtos.InvokeBindingEnvelope.newBuilder()
DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder()
.setName(name);
if (byteRequest != null) {
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
builder.setData(data);
builder.setData(ByteString.copyFrom(byteRequest));
}
if (metadata != null) {
builder.putAllMetadata(metadata);
}
DaprProtos.InvokeBindingEnvelope envelope = builder.build();
DaprProtos.InvokeBindingRequest envelope = builder.build();
return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope);
futureEmpty.get();
@ -237,17 +268,17 @@ public class DaprClientGrpc implements DaprClient {
if ((key == null) || (key.trim().isEmpty())) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
DaprProtos.GetStateEnvelope.Builder builder = DaprProtos.GetStateEnvelope.newBuilder()
DaprProtos.GetStateRequest.Builder builder = DaprProtos.GetStateRequest.newBuilder()
.setStoreName(stateStoreName)
.setKey(key);
if (options != null && options.getConsistency() != null) {
builder.setConsistency(options.getConsistency().getValue());
builder.setConsistency(getGrpcStateConsistency(options));
}
DaprProtos.GetStateEnvelope envelope = builder.build();
DaprProtos.GetStateRequest envelope = builder.build();
return Mono.fromCallable(() -> {
ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope);
DaprProtos.GetStateResponseEnvelope response = null;
ListenableFuture<DaprProtos.GetStateResponse> futureResponse = client.getState(envelope);
DaprProtos.GetStateResponse response = null;
try {
response = futureResponse.get();
} catch (NullPointerException npe) {
@ -261,11 +292,11 @@ public class DaprClientGrpc implements DaprClient {
}
private <T> State<T> buildStateKeyValue(
DaprProtos.GetStateResponseEnvelope response,
DaprProtos.GetStateResponse response,
String requestedKey,
StateOptions stateOptions,
Class<T> clazz) throws IOException {
ByteString payload = response.getData().getValue();
ByteString payload = response.getData();
byte[] data = payload == null ? null : payload.toByteArray();
T value = stateSerializer.deserialize(data, clazz);
String etag = response.getEtag();
@ -282,14 +313,14 @@ public class DaprClientGrpc implements DaprClient {
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder();
DaprProtos.SaveStateRequest.Builder builder = DaprProtos.SaveStateRequest.newBuilder();
builder.setStoreName(stateStoreName);
for (State state : states) {
builder.addRequests(buildStateRequest(state).build());
builder.addStates(buildStateRequest(state).build());
}
DaprProtos.SaveStateEnvelope envelope = builder.build();
DaprProtos.SaveStateRequest request = builder.build();
return Mono.fromCallable(() -> client.saveState(envelope)).flatMap(f -> {
return Mono.fromCallable(() -> client.saveState(request)).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
@ -302,10 +333,10 @@ public class DaprClientGrpc implements DaprClient {
}
}
private <T> DaprProtos.StateRequest.Builder buildStateRequest(State<T> state) throws IOException {
private <T> CommonProtos.StateItem.Builder buildStateRequest(State<T> state) throws IOException {
byte[] bytes = stateSerializer.serialize(state.getValue());
Any data = Any.newBuilder().setValue(ByteString.copyFrom(bytes)).build();
DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder();
ByteString data = ByteString.copyFrom(bytes);
CommonProtos.StateItem.Builder stateBuilder = CommonProtos.StateItem.newBuilder();
if (state.getEtag() != null) {
stateBuilder.setEtag(state.getEtag());
}
@ -313,13 +344,13 @@ public class DaprClientGrpc implements DaprClient {
stateBuilder.setValue(data);
}
stateBuilder.setKey(state.getKey());
DaprProtos.StateOptions.Builder optionBuilder = null;
CommonProtos.StateOptions.Builder optionBuilder = null;
if (state.getOptions() != null) {
StateOptions options = state.getOptions();
DaprProtos.RetryPolicy.Builder retryPolicyBuilder = null;
CommonProtos.StateRetryPolicy.Builder retryPolicyBuilder = null;
if (options.getRetryPolicy() != null) {
retryPolicyBuilder = DaprProtos.RetryPolicy.newBuilder();
StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy();
retryPolicyBuilder = CommonProtos.StateRetryPolicy.newBuilder();
RetryPolicy retryPolicy = options.getRetryPolicy();
if (options.getRetryPolicy().getInterval() != null) {
Duration.Builder durationBuilder = Duration.newBuilder()
.setNanos(retryPolicy.getInterval().getNano())
@ -330,16 +361,16 @@ public class DaprClientGrpc implements DaprClient {
retryPolicyBuilder.setThreshold(retryPolicy.getThreshold());
}
if (retryPolicy.getPattern() != null) {
retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue());
retryPolicyBuilder.setPattern(getGrpcStateRetryPolicy(retryPolicy));
}
}
optionBuilder = DaprProtos.StateOptions.newBuilder();
optionBuilder = CommonProtos.StateOptions.newBuilder();
if (options.getConcurrency() != null) {
optionBuilder.setConcurrency(options.getConcurrency().getValue());
optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
}
if (options.getConsistency() != null) {
optionBuilder.setConsistency(options.getConsistency().getValue());
optionBuilder.setConsistency(getGrpcStateConsistency(options));
}
if (retryPolicyBuilder != null) {
optionBuilder.setRetryPolicy(retryPolicyBuilder.build());
@ -389,13 +420,13 @@ public class DaprClientGrpc implements DaprClient {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
DaprProtos.StateOptions.Builder optionBuilder = null;
CommonProtos.StateOptions.Builder optionBuilder = null;
if (options != null) {
optionBuilder = DaprProtos.StateOptions.newBuilder();
DaprProtos.RetryPolicy.Builder retryPolicyBuilder = null;
optionBuilder = CommonProtos.StateOptions.newBuilder();
CommonProtos.StateRetryPolicy.Builder retryPolicyBuilder = null;
if (options.getRetryPolicy() != null) {
retryPolicyBuilder = DaprProtos.RetryPolicy.newBuilder();
StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy();
retryPolicyBuilder = CommonProtos.StateRetryPolicy.newBuilder();
RetryPolicy retryPolicy = options.getRetryPolicy();
if (options.getRetryPolicy().getInterval() != null) {
Duration.Builder durationBuilder = Duration.newBuilder()
.setNanos(retryPolicy.getInterval().getNano())
@ -406,22 +437,22 @@ public class DaprClientGrpc implements DaprClient {
retryPolicyBuilder.setThreshold(retryPolicy.getThreshold());
}
if (retryPolicy.getPattern() != null) {
retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue());
retryPolicyBuilder.setPattern(getGrpcStateRetryPolicy(retryPolicy));
}
}
optionBuilder = DaprProtos.StateOptions.newBuilder();
optionBuilder = CommonProtos.StateOptions.newBuilder();
if (options.getConcurrency() != null) {
optionBuilder.setConcurrency(options.getConcurrency().getValue());
optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
}
if (options.getConsistency() != null) {
optionBuilder.setConsistency(options.getConsistency().getValue());
optionBuilder.setConsistency(getGrpcStateConsistency(options));
}
if (retryPolicyBuilder != null) {
optionBuilder.setRetryPolicy(retryPolicyBuilder.build());
}
}
DaprProtos.DeleteStateEnvelope.Builder builder = DaprProtos.DeleteStateEnvelope.newBuilder()
DaprProtos.DeleteStateRequest.Builder builder = DaprProtos.DeleteStateRequest.newBuilder()
.setStoreName(stateStoreName)
.setKey(key);
if (etag != null) {
@ -432,8 +463,8 @@ public class DaprClientGrpc implements DaprClient {
builder.setOptions(optionBuilder.build());
}
DaprProtos.DeleteStateEnvelope envelope = builder.build();
return Mono.fromCallable(() -> client.deleteState(envelope)).flatMap(f -> {
DaprProtos.DeleteStateRequest request = builder.build();
return Mono.fromCallable(() -> client.deleteState(request)).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
@ -499,16 +530,16 @@ public class DaprClientGrpc implements DaprClient {
return Mono.error(e);
}
DaprProtos.GetSecretEnvelope.Builder envelopeBuilder = DaprProtos.GetSecretEnvelope.newBuilder()
DaprProtos.GetSecretRequest.Builder requestBuilder = DaprProtos.GetSecretRequest.newBuilder()
.setStoreName(secretStoreName)
.setKey(secretName);
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
requestBuilder.putAllMetadata(metadata);
}
return Mono.fromCallable(() -> {
DaprProtos.GetSecretEnvelope envelope = envelopeBuilder.build();
ListenableFuture<DaprProtos.GetSecretResponseEnvelope> future = client.getSecret(envelope);
DaprProtos.GetSecretRequest request = requestBuilder.build();
ListenableFuture<DaprProtos.GetSecretResponse> future = client.getSecret(request);
return future.get();
}).map(future -> future.getDataMap());
}

View File

@ -55,7 +55,7 @@ public class DaprClientGrpcTest {
@Test(expected = RuntimeException.class)
public void publishEventExceptionThrownTest() {
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenThrow(RuntimeException.class);
Mono<Void> result = adapter.publishEvent("topic", "object");
result.block();
@ -67,7 +67,7 @@ public class DaprClientGrpcTest {
RuntimeException ex = new RuntimeException("An Exception");
MockCallback<Empty> callback = new MockCallback<Empty>(ex);
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.publishEvent("topic", "object");
settableFuture.setException(ex);
@ -79,7 +79,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.publishEvent("topic", "object");
settableFuture.set(Empty.newBuilder().build());
@ -92,7 +92,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
@ -107,7 +107,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenReturn(settableFuture);
MyObject event = new MyObject(1, "Event");
Mono<Void> result = adapter.publishEvent("topic", event);
@ -132,7 +132,7 @@ public class DaprClientGrpcTest {
new MockCallback<Empty>(ex);
addCallback(settableFuture, callback, directExecutor());
settableFuture.setException(ex);
when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class)))
when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.invokeBinding("BindingName", "request");
result.block();
@ -143,7 +143,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class)))
when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.invokeBinding("BindingName", "request");
settableFuture.set(Empty.newBuilder().build());
@ -156,7 +156,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class)))
when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class)))
.thenReturn(settableFuture);
MyObject event = new MyObject(1, "Event");
Mono<Void> result = adapter.invokeBinding("BindingName", event);
@ -170,7 +170,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class)))
when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class)))
.thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
@ -488,7 +488,7 @@ public class DaprClientGrpcTest {
@Test(expected = RuntimeException.class)
public void getStateExceptionThrownTest() {
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class))).thenThrow(RuntimeException.class);
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))).thenThrow(RuntimeException.class);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, key, String.class);
result.block();
@ -496,12 +496,12 @@ public class DaprClientGrpcTest {
@Test(expected = RuntimeException.class)
public void getStateCallbackExceptionThrownTest() {
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
SettableFuture<DaprProtos.GetStateResponse> settableFuture = SettableFuture.create();
RuntimeException ex = new RuntimeException("An Exception");
MockCallback<DaprProtos.GetStateResponseEnvelope> callback =
MockCallback<DaprProtos.GetStateResponse> callback =
new MockCallback<>(ex);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class)))
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class)))
.thenReturn(settableFuture);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, key, String.class);
@ -515,11 +515,11 @@ public class DaprClientGrpcTest {
String key = "key1";
String expectedValue = "Expected state";
State<String> expectedState = buildStateKey(expectedValue, key, etag, null);
DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
DaprProtos.GetStateResponse responseEnvelope = buildGetStateResponse(expectedValue, etag);
SettableFuture<DaprProtos.GetStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class)))
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class)))
.thenReturn(settableFuture);
State<String> keyRequest = buildStateKey(null, key, etag, null);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class);
@ -533,11 +533,11 @@ public class DaprClientGrpcTest {
String key = "key1";
String expectedValue = "Expected state";
State<String> expectedState = buildStateKey(expectedValue, key, etag, null);
DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
DaprProtos.GetStateResponse responseEnvelope = buildGetStateResponse(expectedValue, etag);
SettableFuture<DaprProtos.GetStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class)))
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
@ -556,15 +556,15 @@ public class DaprClientGrpcTest {
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
State<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(expectedValue))
DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder()
.setData(getBytes(expectedValue))
.setEtag(etag)
.build();
State<MyObject> keyRequest = buildStateKey(null, key, etag, options);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
SettableFuture<DaprProtos.GetStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class)))
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class)))
.thenReturn(settableFuture);
Mono<State<MyObject>> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class);
settableFuture.set(responseEnvelope);
@ -579,15 +579,15 @@ public class DaprClientGrpcTest {
StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR));
State<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(expectedValue))
DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder()
.setData(getBytes(expectedValue))
.setEtag(etag)
.build();
State<MyObject> keyRequest = buildStateKey(null, key, etag, options);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
SettableFuture<DaprProtos.GetStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateEnvelope.class)))
when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class)))
.thenReturn(settableFuture);
Mono<State<MyObject>> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class);
settableFuture.set(responseEnvelope);
@ -596,7 +596,7 @@ public class DaprClientGrpcTest {
@Test(expected = RuntimeException.class)
public void deleteStateExceptionThrowTest() {
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class))).thenThrow(RuntimeException.class);
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))).thenThrow(RuntimeException.class);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions());
result.block();
@ -609,7 +609,7 @@ public class DaprClientGrpcTest {
MockCallback<Empty> callback =
new MockCallback<Empty>(ex);
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions());
@ -624,7 +624,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, null);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -643,7 +643,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -662,7 +662,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
@ -683,7 +683,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -702,7 +702,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -721,7 +721,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -740,7 +740,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -759,7 +759,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -778,7 +778,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
@ -793,7 +793,7 @@ public class DaprClientGrpcTest {
String key = "key1";
String etag = "ETag1";
String value = "State value";
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenThrow(RuntimeException.class);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenThrow(RuntimeException.class);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null);
result.block();
}
@ -807,7 +807,7 @@ public class DaprClientGrpcTest {
RuntimeException ex = new RuntimeException("An Exception");
MockCallback<Empty> callback = new MockCallback<>(ex);
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null);
settableFuture.setException(ex);
result.block();
@ -821,7 +821,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -836,7 +836,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -853,7 +853,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenAnswer(c -> {
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
});
@ -872,7 +872,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -889,7 +889,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -906,7 +906,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, null, null);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -923,7 +923,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -940,7 +940,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -957,7 +957,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, null);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
@ -998,10 +998,10 @@ public class DaprClientGrpcTest {
String key2 = "key2";
String expectedValue2 = "Expected state 2";
State<String> expectedState1 = buildStateKey(expectedValue1, key1, etag, null);
Map<String, SettableFuture<DaprProtos.GetStateResponseEnvelope>> futuresMap = new HashMap<>();
Map<String, SettableFuture<DaprProtos.GetStateResponse>> futuresMap = new HashMap<>();
futuresMap.put(key1, buildFutureGetStateEnvelop(expectedValue1, etag));
futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag));
when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key1)))).thenReturn(futuresMap.get(key1));
when(client.getState(argThat(new GetStateRequestKeyMatcher(key1)))).thenReturn(futuresMap.get(key1));
State<String> keyRequest1 = buildStateKey(null, key1, etag, null);
Mono<State<String>> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class);
assertEquals(expectedState1, resultGet1.block());
@ -1011,7 +1011,7 @@ public class DaprClientGrpcTest {
SettableFuture<Empty> settableFutureDelete = SettableFuture.create();
MockCallback<Empty> callbackDelete = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFutureDelete, callbackDelete, directExecutor());
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateEnvelope.class)))
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
.thenReturn(settableFutureDelete);
Mono<Void> resultDelete = adapter.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(),
keyRequest2.getOptions());
@ -1019,7 +1019,7 @@ public class DaprClientGrpcTest {
resultDelete.block();
assertTrue(callbackDelete.wasCalled);
futuresMap.replace(key2, null);
when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key2)))).thenReturn(futuresMap.get(key2));
when(client.getState(argThat(new GetStateRequestKeyMatcher(key2)))).thenReturn(futuresMap.get(key2));
State<String> state2 = resultGet2.block();
assertNull(state2);
@ -1029,15 +1029,15 @@ public class DaprClientGrpcTest {
public void getSecrets() {
String expectedKey = "attributeKey";
String expectedValue = "Expected secret value";
DaprProtos.GetSecretResponseEnvelope responseEnvelope = buildGetSecretResponseEnvelope(expectedKey, expectedValue);
SettableFuture<DaprProtos.GetSecretResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(expectedKey, expectedValue);
SettableFuture<DaprProtos.GetSecretResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
settableFuture.set(responseEnvelope);
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class)))
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class)))
.thenAnswer(context -> {
io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0);
io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0);
assertEquals("key", req.getKey());
assertEquals(SECRET_STORE_NAME, req.getStoreName());
assertEquals(0, req.getMetadataCount());
@ -1052,15 +1052,15 @@ public class DaprClientGrpcTest {
@Test
public void getSecretsEmptyResponse() {
DaprProtos.GetSecretResponseEnvelope responseEnvelope = buildGetSecretResponseEnvelope();
SettableFuture<DaprProtos.GetSecretResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse();
SettableFuture<DaprProtos.GetSecretResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
settableFuture.set(responseEnvelope);
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class)))
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class)))
.thenAnswer(context -> {
io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0);
io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0);
assertEquals("key", req.getKey());
assertEquals(SECRET_STORE_NAME, req.getStoreName());
assertEquals(0, req.getMetadataCount());
@ -1074,14 +1074,14 @@ public class DaprClientGrpcTest {
@Test
public void getSecretsException() {
SettableFuture<DaprProtos.GetSecretResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponseEnvelope> callback = new MockCallback<>(new RuntimeException());
SettableFuture<DaprProtos.GetSecretResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponse> callback = new MockCallback<>(new RuntimeException());
addCallback(settableFuture, callback, directExecutor());
settableFuture.setException(new RuntimeException());
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class)))
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class)))
.thenAnswer(context -> {
io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0);
io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0);
assertEquals("key", req.getKey());
assertEquals(SECRET_STORE_NAME, req.getStoreName());
assertEquals(0, req.getMetadataCount());
@ -1097,15 +1097,15 @@ public class DaprClientGrpcTest {
public void getSecretsWithMetadata() {
String expectedKey = "attributeKey";
String expectedValue = "Expected secret value";
DaprProtos.GetSecretResponseEnvelope responseEnvelope = buildGetSecretResponseEnvelope(expectedKey, expectedValue);
SettableFuture<DaprProtos.GetSecretResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(expectedKey, expectedValue);
SettableFuture<DaprProtos.GetSecretResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetSecretResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
settableFuture.set(responseEnvelope);
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretEnvelope.class)))
when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class)))
.thenAnswer(context -> {
io.dapr.v1.DaprProtos.GetSecretEnvelope req = context.getArgument(0);
io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0);
assertEquals("key", req.getKey());
assertEquals(SECRET_STORE_NAME, req.getStoreName());
assertEquals("metavalue", req.getMetadataMap().get("metakey"));
@ -1121,31 +1121,100 @@ public class DaprClientGrpcTest {
assertEquals(expectedValue, result.get(expectedKey));
}
private <T> SettableFuture<DaprProtos.GetStateResponseEnvelope> buildFutureGetStateEnvelop(T value, String etag) throws IOException {
DaprProtos.GetStateResponseEnvelope envelope = buildGetStateResponseEnvelope(value, etag);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(envelope);
/* If this test is failing, it means that a new value was added to StateOptions.Consistency
* enum, without creating a mapping to one of the proto defined gRPC enums
*/
@Test
public void stateOptionsConsistencyValuesHaveValidGrpcEnumMappings() {
String key = "key1";
String etag = "ETag1";
String value = "State value";
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
settableFuture.set(Empty.newBuilder().build());
for (StateOptions.Consistency consistency : StateOptions.Consistency.values()) {
StateOptions options = buildStateOptions(consistency, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
result.block();
}
assertTrue(callback.wasCalled);
}
/* If this test is failing, it means that a new value was added to StateOptions.Concurrency
* enum, without creating a mapping to one of the proto defined gRPC enums
*/
@Test
public void stateOptionsConcurrencyValuesHaveValidGrpcEnumMappings() {
String key = "key1";
String etag = "ETag1";
String value = "State value";
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
settableFuture.set(Empty.newBuilder().build());
for (StateOptions.Concurrency concurrency : StateOptions.Concurrency.values()) {
StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, concurrency,
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
result.block();
}
assertTrue(callback.wasCalled);
}
/* If this test is failing, it means that a new value was added to StateOptions.RetryPolicy.Pattern
* enum, without creating a mapping to one of the proto defined gRPC enums
*/
@Test
public void stateOptionsRetryPatternValuesHaveValidGrpcEnumMappings() {
String key = "key1";
String etag = "ETag1";
String value = "State value";
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
settableFuture.set(Empty.newBuilder().build());
for (StateOptions.RetryPolicy.Pattern retryPattern : StateOptions.RetryPolicy.Pattern.values()) {
StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL,
StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), null, retryPattern);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
result.block();
}
assertTrue(callback.wasCalled);
}
private <T> SettableFuture<DaprProtos.GetStateResponse> buildFutureGetStateEnvelop(T value, String etag) throws IOException {
DaprProtos.GetStateResponse envelope = buildGetStateResponse(value, etag);
SettableFuture<DaprProtos.GetStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponse> callback = new MockCallback<>(envelope);
addCallback(settableFuture, callback, directExecutor());
settableFuture.set(envelope);
return settableFuture;
}
private <T> DaprProtos.GetStateResponseEnvelope buildGetStateResponseEnvelope(T value, String etag) throws IOException {
return DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(value))
private <T> DaprProtos.GetStateResponse buildGetStateResponse(T value, String etag) throws IOException {
return DaprProtos.GetStateResponse.newBuilder()
.setData(getBytes(value))
.setEtag(etag)
.build();
}
private DaprProtos.GetSecretResponseEnvelope buildGetSecretResponseEnvelope(String key, String value) {
return DaprProtos.GetSecretResponseEnvelope.newBuilder()
private DaprProtos.GetSecretResponse buildGetSecretResponse(String key, String value) {
return DaprProtos.GetSecretResponse.newBuilder()
.putAllData(Collections.singletonMap(key, value))
.build();
}
private DaprProtos.GetSecretResponseEnvelope buildGetSecretResponseEnvelope() {
return DaprProtos.GetSecretResponseEnvelope.newBuilder().build();
private DaprProtos.GetSecretResponse buildGetSecretResponse() {
return DaprProtos.GetSecretResponse.newBuilder().build();
}
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency,
@ -1168,6 +1237,11 @@ public class DaprClientGrpcTest {
return Any.newBuilder().setValue(ByteString.copyFrom(byteValue)).build();
}
private <T> ByteString getBytes(T value) throws IOException {
byte[] byteValue = serializer.serialize(value);
return ByteString.copyFrom(byteValue);
}
private final class MockCallback<T> implements FutureCallback<T> {
private T value = null;
private Throwable failure = null;
@ -1248,16 +1322,16 @@ public class DaprClientGrpcTest {
}
}
private static class GetStateEnvelopeKeyMatcher implements ArgumentMatcher<DaprProtos.GetStateEnvelope> {
private static class GetStateRequestKeyMatcher implements ArgumentMatcher<DaprProtos.GetStateRequest> {
private final String propValue;
GetStateEnvelopeKeyMatcher(String propValue) {
GetStateRequestKeyMatcher(String propValue) {
this.propValue = propValue;
}
@Override
public boolean matches(DaprProtos.GetStateEnvelope argument) {
public boolean matches(DaprProtos.GetStateRequest argument) {
if (argument == null) {
return false;
}