Refactoring HTTP Client and creating Client HTTP Adapter (#68)

* Refactoring HTTP Client to follow same pattern as the GRPC Client.
Creating DaprClientHttpAdapter to be exposed to the users.
Refactor all uses of the http Client to use the adapter instead following composition rather than inheritance.
Dealing with conflicts

* Renaming AppToDaprHttpAsync to AppToDaprHttpAsyncClient changed previously by mistake

* Refactor Adapters to centralize all generic and actor specific functionality in the same place, having a single entry point for all communications to DAPR.
Leaving GRPC Adapter implemented, but without the possibility to create an instance of it.
Fixing Test cases

* Addressing PR comments
This commit is contained in:
Andres Robles 2020-01-07 22:08:55 -06:00 committed by Artur Souza
parent d441f5441f
commit 5e3dc74003
31 changed files with 1953 additions and 1200 deletions

View File

@ -1,24 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.client;
import reactor.core.publisher.Mono;
/**
* Interface to invoke actor methods.
*/
interface ActorProxyAsyncClient {
/**
* Invokes an Actor method on Dapr.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param methodName Method name to invoke.
* @param jsonPayload Serialized body.
* @return Asynchronous result with the Actor's response.
*/
Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload);
}

View File

@ -2,6 +2,8 @@ package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.ActorStateSerializer;
import io.dapr.client.DaprClientBuilder;
import okhttp3.OkHttpClient;
/**
* Builder to generate an ActorProxy instance.
@ -16,7 +18,7 @@ public class ActorProxyBuilder {
/**
* Builder for the Dapr client.
*/
private final ActorProxyClientBuilder clientBuilder = new ActorProxyClientBuilder();
private final DaprClientBuilder clientBuilder = new DaprClientBuilder();
/**
* Actor's type.
@ -28,17 +30,6 @@ public class ActorProxyBuilder {
*/
private ActorId actorId;
/**
* Changes build config to use specific port.
*
* @param port Port to be used.
* @return Same builder object.
*/
public ActorProxyBuilder withPort(int port) {
this.clientBuilder.withPort(port);
return this;
}
/**
* Changes build config to use given Actor's type.
*
@ -80,7 +71,7 @@ public class ActorProxyBuilder {
this.actorType,
this.actorId,
SERIALIZER,
this.clientBuilder.buildAsyncClient());
this.clientBuilder.build());
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.client;
import io.dapr.client.AbstractClientBuilder;
import okhttp3.OkHttpClient;
/**
* Builds an instance of ActorProxyAsyncClient.
*/
class ActorProxyClientBuilder extends AbstractClientBuilder {
/**
* Builds an async client.
*
* @return Builds an async client.
*/
public ActorProxyAsyncClient buildAsyncClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
// TODO: Expose configurations for OkHttpClient or com.microsoft.rest.RestClient.
return new ActorProxyHttpAsyncClient(super.getPort(), builder.build());
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.client;
import io.dapr.client.AbstractDaprHttpClient;
import io.dapr.utils.Constants;
import okhttp3.OkHttpClient;
import reactor.core.publisher.Mono;
/**
* Http client to call actors methods.
*/
class ActorProxyHttpAsyncClient extends AbstractDaprHttpClient implements ActorProxyAsyncClient {
/**
* Creates a new instance of {@link ActorProxyHttpAsyncClient}.
*
* @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance.
*/
ActorProxyHttpAsyncClient(int port, OkHttpClient httpClient) {
super(port, httpClient);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload) {
String url = String.format(Constants.ACTOR_METHOD_RELATIVE_URL_FORMAT, actorType, actorId, methodName);
return super.invokeAPI("PUT", url, jsonPayload);
}
}

View File

@ -2,6 +2,7 @@ package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.ActorStateSerializer;
import io.dapr.client.DaprClient;
import reactor.core.publisher.Mono;
import java.io.IOException;
@ -29,7 +30,7 @@ class ActorProxyImpl implements ActorProxy {
/**
* Client to talk to the Dapr's API.
*/
private final ActorProxyAsyncClient daprClient;
private final DaprClient daprClient;
/**
* Creates a new instance of {@link ActorProxyAsyncClient}.
@ -39,7 +40,7 @@ class ActorProxyImpl implements ActorProxy {
* @param serializer Serializer and deserializer for method calls.
* @param daprClient Dapr client.
*/
ActorProxyImpl(String actorType, ActorId actorId, ActorStateSerializer serializer, ActorProxyAsyncClient daprClient) {
ActorProxyImpl(String actorType, ActorId actorId, ActorStateSerializer serializer, DaprClient daprClient) {
this.actorType = actorType;
this.actorId = actorId;
this.daprClient = daprClient;

View File

@ -98,7 +98,7 @@ public abstract class AbstractActor {
String data = this.actorRuntimeContext.getActorSerializer().serialize(state);
ActorReminderParams params = new ActorReminderParams(data, dueTime, period);
String serialized = this.actorRuntimeContext.getActorSerializer().serialize(params);
return this.actorRuntimeContext.getDaprClient().registerReminder(
return this.actorRuntimeContext.getDaprClient().registerActorReminder(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName,
@ -142,7 +142,7 @@ public abstract class AbstractActor {
String serializedTimer = this.actorRuntimeContext.getActorSerializer().serialize(actorTimer);
this.timers.put(name, actorTimer);
return this.actorRuntimeContext.getDaprClient().registerTimer(
return this.actorRuntimeContext.getDaprClient().registerActorTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
name,
@ -159,7 +159,7 @@ public abstract class AbstractActor {
* @return Asynchronous void response.
*/
protected Mono<Void> unregister(ActorTimer actorTimer) {
return this.actorRuntimeContext.getDaprClient().unregisterTimer(
return this.actorRuntimeContext.getDaprClient().unregisterActorTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
actorTimer.getName())

View File

@ -6,6 +6,8 @@ package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorTrace;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import reactor.core.publisher.Mono;
import java.util.Collection;
@ -37,7 +39,7 @@ public class ActorRuntime {
/**
* A client used to communicate from the actor to the Dapr runtime.
*/
private final AppToDaprAsyncClient appToDaprAsyncClient;
private final DaprClient daprClient;
/**
* State provider for Dapr.
@ -65,9 +67,9 @@ public class ActorRuntime {
}
this.actorManagers = Collections.synchronizedMap(new HashMap<>());
this.appToDaprAsyncClient = new AppToDaprClientBuilder().buildAsyncClient();
this.daprClient = new DaprClientBuilder().build();
this.actorSerializer = new ActorStateSerializer();
this.daprStateProvider = new DaprStateAsyncProvider(this.appToDaprAsyncClient, this.actorSerializer);
this.daprStateProvider = new DaprStateAsyncProvider(this.daprClient, this.actorSerializer);
}
/**
@ -126,8 +128,8 @@ public class ActorRuntime {
this.actorSerializer,
actualActorFactory,
actorTypeInfo,
this.appToDaprAsyncClient,
new DaprStateAsyncProvider(this.appToDaprAsyncClient, this.actorSerializer));
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, this.actorSerializer));
// Create ActorManagers, override existing entry if registered again.
this.actorManagers.put(actorTypeInfo.getName(), new ActorManager<T>(context));

View File

@ -6,6 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorTrace;
import io.dapr.client.DaprClient;
/**
* Provides the context for the Actor's runtime.
@ -42,7 +43,7 @@ public class ActorRuntimeContext<T extends AbstractActor> {
/**
* Client to communicate to Dapr's API.
*/
private final AppToDaprAsyncClient daprClient;
private final DaprClient daprClient;
/**
* State provider for given Actor Type.
@ -63,7 +64,7 @@ public class ActorRuntimeContext<T extends AbstractActor> {
ActorStateSerializer actorSerializer,
ActorFactory<T> actorFactory,
ActorTypeInformation<T> actorTypeInformation,
AppToDaprAsyncClient daprClient,
DaprClient daprClient,
DaprStateAsyncProvider stateProvider) {
this.actorRuntime = actorRuntime;
this.actorSerializer = actorSerializer;
@ -124,7 +125,7 @@ public class ActorRuntimeContext<T extends AbstractActor> {
*
* @return Client to communicate to Dapr's API.
*/
AppToDaprAsyncClient getDaprClient() {
DaprClient getDaprClient() {
return this.daprClient;
}

View File

@ -1,75 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import reactor.core.publisher.Mono;
/**
* Interface for interacting from the actor app to the Dapr runtime.
*/
interface AppToDaprAsyncClient {
/**
* Gets a state from Dapr's Actor.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param keyName State name.
* @return Asynchronous result with current state value.
*/
Mono<String> getState(String actorType, String actorId, String keyName);
/**
* Saves state batch to Dapr.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param data State to be saved.
* @return Asynchronous void result.
*/
Mono<Void> saveStateTransactionally(String actorType, String actorId, String data);
/**
* Register a reminder.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param reminderName Name of reminder to be registered.
* @param data JSON reminder data as per Dapr's spec.
* @return Asynchronous void result.
*/
Mono<Void> registerReminder(String actorType, String actorId, String reminderName, String data);
/**
* Unregisters a reminder.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param reminderName Name of reminder to be unregistered.
* @return Asynchronous void result.
*/
Mono<Void> unregisterReminder(String actorType, String actorId, String reminderName);
/**
* Registers a timer.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param timerName Name of timer to be registered.
* @param data JSON reminder data as per Dapr's spec.
* @return Asynchronous void result.
*/
Mono<Void> registerTimer(String actorType, String actorId, String timerName, String data);
/**
* Unregisters a timer.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param timerName Name of timer to be unregistered.
* @return Asynchronous void result.
*/
Mono<Void> unregisterTimer(String actorType, String actorId, String timerName);
}

View File

@ -1,25 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.client.AbstractClientBuilder;
import okhttp3.OkHttpClient;
/**
* Builds an instance of AppToDaprAsyncClient.
*/
class AppToDaprClientBuilder extends AbstractClientBuilder {
/**
* Builds an async client.
*
* @return Builds an async client.
*/
public AppToDaprAsyncClient buildAsyncClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
// TODO: Expose configurations for OkHttpClient or com.microsoft.rest.RestClient.
return new AppToDaprHttpAsyncClient(super.getPort(), builder.build());
}
}

View File

@ -1,208 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.client.AbstractDaprHttpClient;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.Constants;
import io.dapr.utils.ObjectSerializer;
import okhttp3.OkHttpClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
/**
* Http client to call Dapr's API for actors.
*/
//public class DaprHttpAsyncClient implements DaprAsyncClient {
class AppToDaprHttpAsyncClient extends AbstractDaprHttpClient implements AppToDaprAsyncClient {
/**
* ObjectMapper to Serialize data
*/
private static final ObjectSerializer MAPPER = new ObjectSerializer();
private Map<String, String> dataMap;
/**
* Creates a new instance of {@link AppToDaprHttpAsyncClient}.
*
* @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance.
*/
public AppToDaprHttpAsyncClient(int port, OkHttpClient httpClient) {
super(port, httpClient);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<String> getState(String actorType, String actorId, String keyName) {
String url = String.format(Constants.ACTOR_STATE_KEY_RELATIVE_URL_FORMAT, actorType, actorId, keyName);
return super.invokeAPI("GET", url, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> saveStateTransactionally(String actorType, String actorId, String data) {
String url = String.format(Constants.ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId);
return super.invokeAPIVoid("PUT", url, data);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerReminder(String actorType, String actorId, String reminderName, String data) {
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return super.invokeAPIVoid("PUT", url, data);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterReminder(String actorType, String actorId, String reminderName) {
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return super.invokeAPIVoid("DELETE", url, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerTimer(String actorType, String actorId, String timerName, String data) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return super.invokeAPIVoid("PUT", url, data);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterTimer(String actorType, String actorId, String timerName) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return super.invokeAPIVoid("DELETE", url, null);
}
/**
* Creating publishEvent for Http Client
*
* @param topic HTTP method.
* @param data url as String.
* @param method JSON payload or null.
* @return Mono<String>
*/
public Mono<String> publishEvent(String topic, String data, String method) throws Exception {
if (topic.isEmpty() || topic == null) {
throw new DaprException("500", "Topic cannot be null or empty.");
}
if (method.isEmpty() || method == null) {
throw new DaprException("500", "Method cannot be null or empty.");
}
String url = method.equals("POST") ? Constants.PUBLISH_PATH : Constants.PUBLISH_PATH + "/" + topic;
dataMap = new HashMap();
dataMap.put(topic, data);
String jsonResult = MAPPER.serialize(dataMap);
return super.invokeAPI(method, url, jsonResult);
}
/**
* Creating invokeBinding Method for Http Client
*
* @param name HTTP method.
* @param data url as String.
* @param method JSON payload or null.
* @return Mono<String>
*/
public Mono<String> invokeBinding(String name, String data, String method) throws Exception {
if (name.isEmpty() || name == null) {
throw new DaprException("500", "Name cannot be null or empty.");
}
if (method.isEmpty() || method == null) {
throw new DaprException("500", "Method cannot be null or empty.");
}
String url = method.equals("POST") ? Constants.BINDING_PATH : Constants.BINDING_PATH + "/" + name;
dataMap = new HashMap();
dataMap.put(name, data);
String jsonResult = MAPPER.serialize(dataMap);
return super.invokeAPI(method, url, jsonResult);
}
/**
* Creating invokeBinding Method for Http Client
*
* @param key HTTP method.
* @return Mono<String>
*/
public Mono<String> getState(String key) throws DaprException {
if (key.isEmpty() || key == null) {
throw new DaprException("500", "Name cannot be null or empty.");
}
String url = Constants.STATE_PATH + "/" + key;
return super.invokeAPI("GET", url, null);
}
/**
* Creating invokeBinding Method for Http Client
*
* @param key HTTP method.
* @param data HTTP method.
* @return Mono<String>
*/
public Mono<String> saveState(String key, String data) throws Exception {
if (key.isEmpty() || key == null) {
throw new DaprException("500", "Name cannot be null or empty.");
}
String url = Constants.STATE_PATH;
dataMap = new HashMap();
dataMap.put(key, data);
String jsonResult = MAPPER.serialize(dataMap);
return super.invokeAPI("POST", url, jsonResult);
}
/**
* Creating invokeBinding Method for Http Client
*
* @param key HTTP method.
* @return Mono<String>
*/
public Mono<String> deleteState(String key) throws DaprException {
if (key.isEmpty() || key == null) {
throw new DaprException("500", "Name cannot be null or empty.");
}
String url = Constants.STATE_PATH + "/" + key;
return super.invokeAPI("DELETE", url, null);
}
}

View File

@ -8,6 +8,7 @@ package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprClient;
import reactor.core.publisher.Mono;
import java.io.IOException;
@ -24,17 +25,17 @@ class DaprStateAsyncProvider {
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private final AppToDaprAsyncClient daprAsyncClient;
private final DaprClient daprClient;
private final ActorStateSerializer serializer;
DaprStateAsyncProvider(AppToDaprAsyncClient daprAsyncClient, ActorStateSerializer serializer) {
this.daprAsyncClient = daprAsyncClient;
DaprStateAsyncProvider(DaprClient daprClient, ActorStateSerializer serializer) {
this.daprClient = daprClient;
this.serializer = serializer;
}
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, Class<T> clazz) {
Mono<String> result = this.daprAsyncClient.getState(actorType, actorId.toString(), stateName);
Mono<String> result = this.daprClient.getActorState(actorType, actorId.toString(), stateName);
return result
.filter(s -> (s != null) && (!s.isEmpty()))
@ -48,7 +49,7 @@ class DaprStateAsyncProvider {
}
Mono<Boolean> contains(String actorType, ActorId actorId, String stateName) {
Mono<String> result = this.daprAsyncClient.getState(actorType, actorId.toString(), stateName);
Mono<String> result = this.daprClient.getActorState(actorType, actorId.toString(), stateName);
return result.map(s -> {
return (s != null) && (s.length() > 0);
@ -136,6 +137,6 @@ class DaprStateAsyncProvider {
Mono.empty();
}
return this.daprAsyncClient.saveStateTransactionally(actorType, actorId.toString(), payload);
return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), payload);
}
}

View File

@ -1,59 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client;
import io.dapr.utils.Constants;
/**
* Base class for client builders
*/
public abstract class AbstractClientBuilder {
/**
* Default port for Dapr after checking environment variable.
*/
private int port = AbstractClientBuilder.GetEnvPortOrDefault();
/**
* Overrides the port.
*
* @param port New port.
* @return This instance.
*/
public AbstractClientBuilder withPort(int port) {
this.port = port;
return this;
}
/**
* Returns configured port.
*
* @return Port to connect to Dapr.
*/
protected int getPort() {
return this.port;
}
/**
* Tries to get a valid port from environment variable or returns default.
*
* @return Port defined in env variable or default.
*/
private static int GetEnvPortOrDefault() {
String envPort = System.getenv(Constants.ENV_DAPR_HTTP_PORT);
if (envPort == null) {
return Constants.DEFAULT_PORT;
}
try {
return Integer.parseInt(envPort.trim());
} catch (NumberFormatException e) {
e.printStackTrace();
}
return Constants.DEFAULT_PORT;
}
}

View File

@ -1,150 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.Constants;
import okhttp3.*;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.net.URL;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class AbstractDaprHttpClient {
/**
* Defines the standard application/json type for HTTP calls in Dapr.
*/
private static final MediaType MEDIA_TYPE_APPLICATION_JSON =
MediaType.get("application/json; charset=utf-8");
/**
* Shared object representing an empty request body in JSON.
*/
private static final RequestBody REQUEST_BODY_EMPTY_JSON =
RequestBody.Companion.create("", MEDIA_TYPE_APPLICATION_JSON);
/**
* JSON Object Mapper.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* The base url used for form urls. This is typically "http://localhost:3500".
*/
private final String baseUrl;
/**
* Http client used for all API calls.
*/
private final OkHttpClient httpClient;
/**
* Thread-pool for HTTP calls.
*/
private final ExecutorService pool;
/**
* Creates a new instance of {@link AbstractDaprHttpClient}.
*
* @param port Port for calling Dapr. (e.g. 3500)
* @param threadPoolSize Number of threads for http calls.
* @param httpClient RestClient used for all API calls in this new instance.
*/
public AbstractDaprHttpClient(int port, int threadPoolSize, OkHttpClient httpClient) {
this.baseUrl = String.format("http://%s:%d/", Constants.DEFAULT_HOSTNAME, port);
this.httpClient = httpClient;
this.pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* Creates a new instance of {@link AbstractDaprHttpClient}.
*
* @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance.
*/
public AbstractDaprHttpClient(int port, OkHttpClient httpClient) {
this(port, 1, httpClient);
}
/**
* Invokes an API asynchronously that returns Void.
*
* @param method HTTP method.
* @param urlString url as String.
* @param json JSON payload or null.
* @return Asynchronous Void
*/
protected final Mono<Void> invokeAPIVoid(String method, String urlString, String json) {
return this.invokeAPI(method, urlString, json).then();
}
/**
* Invokes an API asynchronously that returns a text payload.
*
* @param method HTTP method.
* @param urlString url as String.
* @param json JSON payload or null.
* @return Asynchronous text
*/
public final Mono<String> invokeAPI(String method, String urlString, String json) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(
() -> {
try {
String requestId = UUID.randomUUID().toString();
RequestBody body =
json != null ? RequestBody.Companion.create(json, MEDIA_TYPE_APPLICATION_JSON) : REQUEST_BODY_EMPTY_JSON;
Request request = new Request.Builder()
.url(new URL(this.baseUrl + urlString))
.method(method, (json == null && method.equals("GET")) ? null : body)
.addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId)
.build();
try (Response response = this.httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
DaprError error = parseDaprError(response.body().string());
if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) {
throw new RuntimeException(new DaprException(error));
}
throw new RuntimeException("Unknown error.");
}
String result = response.body().string();
return result == null ? "" : result;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, this.pool);
return Mono.fromFuture(future);
}
/**
* Tries to parse an error from Dapr response body.
*
* @param json Response body from Dapr.
* @return DaprError or null if could not parse.
*/
private static DaprError parseDaprError(String json) {
if (json == null) {
return null;
}
try {
return OBJECT_MAPPER.readValue(json, DaprError.class);
} catch (IOException e) {
throw new RuntimeException("Unknown error: could not parse error json.");
}
}
}

View File

@ -4,8 +4,12 @@
*/
package io.dapr.client;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
*
@ -13,61 +17,161 @@ import reactor.core.publisher.Mono;
*/
public interface DaprClient {
/**
* Publish an event.
*
* @param event the event to be published
* @param <T> The type of event to be publishded.
* @return a Mono plan of type Void
*/
<T> Mono<Void> publishEvent(T event);
/**
* Publish an event.
* @param topic the topic where the event will be published
* @param event the event to be published
* @param <T> The type of event to be publishded.
* @return a Mono plan of type Void
*/
<T> Mono<Void> publishEvent(String topic, T event);
/**
* Invoke a service
*
* @param request The request to be sent to invoke the service
* @param clazz the Type needed as return for the call
* @param <T> the Type of the return
* @param <K> The Type of the request.
* @return A Mono Plan of type clazz
*/
<T, K> Mono<T> invokeService(K request, Class<T> clazz);
/**
* Invoke a service
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service
* @param clazz the Type needed as return for the call
* @param <T> the Type of the return
* @param <K> The Type of the request.
* @return A Mono Plan of type clazz
*/
<T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz);
/**
* Creating a Binding
*
* @param request the request needed for the binding
* @param <T> The type of the request.
* @return a Mono plan of type Void
*/
<T> Mono<Void> invokeBinding(T request);
/**
* Invoke a service
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service
* @param <T> the Type of the requet
* @return a Mono plan of type Void
*/
<T> Mono<Void> invokeService(String verb, String appId, String method, T request);
/**
* Retrieve a State based on their key.
*
* @param key The key of the State to be retrieved
* @param clazz the Type of State needed as return.
* @param <T> the Type of the return
* @param <K> The Type of the key of the State
* @return A Mono Plan for the requested State
*/
<T, K> Mono<T> getState(K key, Class<T> clazz);
/**
* Creating a Binding
* @param name The name of the biding to call
* @param request The request needed for the binding
* @param <T> The type of the request.
* @return a Mono plan of type Void
*/
<T> Mono<Void> invokeBinding(String name, T request);
/**
* Save/Update a State.
*
* @param state the State to be saved
* @param <T> the Type of the State
* @return a Mono plan of type Void
*/
<T> Mono<Void> saveState(T state);
/**
* Retrieve a State based on their key.
*
* @param state The key of the State to be retrieved
* @param stateOptions
* @param clazz the Type of State needed as return.
* @param <T> the Type of the return
* @param <K> The Type of the key of the State
* @return A Mono Plan for the requested State
*/
<T, K> Mono<T> getState(StateKeyValue<K> state, StateOptions stateOptions, Class<T> clazz);
/**
* Delete a state
*
* @param key The key of the State to be removed
* @param <T> The Type of the key of the State
* @return a Mono plan of type Void
*/
<T> Mono<Void> deleteState(T key);
/**
* Save/Update a list of states.
*
* @param states the States to be saved
* @param options the Options to use for each state
* @param <T> the Type of the State
* @return a Mono plan of type Void
*/
<T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options);
/**
* Save/Update a state
* @param key the key of the state
* @param etag the etag to be used
* @param value the value of the state
* @param options the Options to use for each state
* @param <T> the Type of the State
* @return a Mono plan of type Void
*/
<T> Mono<Void> saveState(String key, String etag, T value, StateOptions options);
/**
* Delete a state
*
* @param state The key of the State to be removed
* @param options The options of the state
* @param <T> The Type of the key of the State
* @return a Mono plan of type Void
*/
<T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options);
/**
* Invokes an Actor method on Dapr.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param methodName Method name to invoke.
* @param jsonPayload Serialized body.
* @return Asynchronous result with the Actor's response.
*/
Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload);
/**
* Gets a state from Dapr's Actor.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param keyName State name.
* @return Asynchronous result with current state value.
*/
Mono<String> getActorState(String actorType, String actorId, String keyName);
/**
* Saves state batch to Dapr.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param data State to be saved.
* @return Asynchronous void result.
*/
Mono<Void> saveActorStateTransactionally(String actorType, String actorId, String data);
/**
* Register a reminder.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param reminderName Name of reminder to be registered.
* @param data JSON reminder data as per Dapr's spec.
* @return Asynchronous void result.
*/
Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, String data);
/**
* Unregisters a reminder.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param reminderName Name of reminder to be unregistered.
* @return Asynchronous void result.
*/
Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName);
/**
* Registers a timer.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param timerName Name of timer to be registered.
* @param data JSON reminder data as per Dapr's spec.
* @return Asynchronous void result.
*/
Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, String data);
/**
* Unregisters a timer.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param timerName Name of timer to be unregistered.
* @return Asynchronous void result.
*/
Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName);
}

View File

@ -5,66 +5,52 @@
package io.dapr.client;
import io.dapr.DaprGrpc;
import io.dapr.utils.Constants;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import okhttp3.OkHttpClient;
/**
* A builder for the DaprClient,
* Only 2 type of clients are supported at the moment, HTTP and GRPC.
* Currently only and HTTP Client will be supported.
*/
public class DaprClientBuilder {
/**
* The type of client supported.
* Default port for Dapr after checking environment variable.
*/
public enum DaprClientTypeEnum {
GRPC,
HTTP;
}
private static final int port = DaprClientBuilder.getEnvPortOrDefault();
/**
* An indicator of the client to be build by the instance of the builder.
* Default host for Dapr after checking environment variable.
*/
private DaprClientTypeEnum clientType;
/**
* The host to be used by the client to communicate.
*/
private String host;
private static final String host = Constants.DEFAULT_HOSTNAME;
/**
* The port to be used by the client to communicate
* The HTTP Client that will be used to injectto connect to Dapr
*/
private Integer port;
private static OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
private static DaprClientHttpAdapter daprHttClient;
/**
* Creates an instance of the builder setting the type of client to be creted
* Tries to get a valid port from environment variable or returns default.
*
* @param clientType Determines if clients need to be over Http or GRPC.
* @return Port defined in env variable or default.
*/
public DaprClientBuilder(DaprClientTypeEnum clientType) {
this.clientType = clientType;
}
private static int getEnvPortOrDefault() {
String envPort = System.getenv(Constants.ENV_DAPR_HTTP_PORT);
if (envPort == null || envPort.trim().isEmpty()) {
return Constants.DEFAULT_PORT;
}
/**
* Sets the host to be used by the client
*
* @param host Host to connect to Dapr.
* @return itself
*/
public DaprClientBuilder withHost(String host) {
this.host = host;
return this;
}
try {
return Integer.parseInt(envPort.trim());
} catch (NumberFormatException e) {
e.printStackTrace();
}
/**
* Sets the port to be used by the client
*
* @param port Port to connect to.
* @return itself
*/
public DaprClientBuilder withPort(Integer port) {
this.port = port;
return this;
return Constants.DEFAULT_PORT;
}
/**
@ -74,12 +60,7 @@ public class DaprClientBuilder {
* @throws java.lang.IllegalStateException if any required field is missing
*/
public DaprClient build() {
if (DaprClientTypeEnum.GRPC.equals(this.clientType)) {
return buildDaprClientGrpc();
} else if (DaprClientTypeEnum.HTTP.equals(this.clientType)) {
return buildDaprClientHttp();
}
throw new IllegalStateException("Unsupported client type.");
return buildDaprClientHttp();
}
/**
@ -92,7 +73,7 @@ public class DaprClientBuilder {
if (null == this.host || "".equals(this.host.trim())) {
throw new IllegalStateException("Host must is required.");
}
if (null == port || port <= 0) {
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
@ -101,10 +82,26 @@ public class DaprClientBuilder {
/**
* Creates and instance of the HTTP CLient.
* If an okhttp3.OkHttpClient.Builder has not been provided, a defult builder will be used.
*
* @return
*/
private DaprClient buildDaprClientHttp() {
throw new UnsupportedOperationException("Not implemented yet.");
if (null == this.host || "".equals(this.host.trim())) {
throw new IllegalStateException("Host must is required.");
}
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}
if (daprHttClient == null) {
synchronized (okHttpClient) {
if (daprHttClient == null) {
DaprHttp daprHtt = new DaprHttp(Constants.DEFAULT_HTTP_PROTOCOL_IDENTIFIED+host, port, okHttpClient);
daprHttClient = new DaprClientHttpAdapter(daprHtt);
}
}
}
return daprHttClient;
}
}

View File

@ -8,9 +8,14 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import io.dapr.DaprGrpc;
import io.dapr.DaprProtos;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import io.dapr.utils.ObjectSerializer;
import reactor.core.publisher.Mono;
import java.lang.reflect.Field;
import java.util.*;
/**
* An adapter for the GRPC Client.
*
@ -19,160 +24,265 @@ import reactor.core.publisher.Mono;
*/
class DaprClientGrpcAdapter implements DaprClient {
/**
* The GRPC client to be used
*
* @see io.dapr.DaprGrpc.DaprFutureStub
*/
private DaprGrpc.DaprFutureStub client;
/**
* A utitlity class for serialize and deserialize the messages sent and retrived by the client.
*/
private ObjectSerializer objectSerializer;
/**
* The GRPC client to be used
*
* @see io.dapr.DaprGrpc.DaprFutureStub
*/
private DaprGrpc.DaprFutureStub client;
/**
* A utitlity class for serialize and deserialize the messages sent and retrived by the client.
*/
private ObjectSerializer objectSerializer;
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param futureClient
* @see io.dapr.client.DaprClientBuilder
*/
DaprClientGrpcAdapter(DaprGrpc.DaprFutureStub futureClient) {
client = futureClient;
objectSerializer = new ObjectSerializer();
}
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param futureClient
* @see io.dapr.client.DaprClientBuilder
*/
DaprClientGrpcAdapter(DaprGrpc.DaprFutureStub futureClient) {
client = futureClient;
objectSerializer = new ObjectSerializer();
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> publishEvent(T event) {
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> publishEvent(String topic, T event) {
try {
String serializedEvent = objectSerializer.serialize(event);
Map<String, String> mapEvent = new HashMap<>();
mapEvent.put("Topic", topic);
mapEvent.put("Data", serializedEvent);
byte[] byteEvent = objectSerializer.serialize(mapEvent).getBytes();
DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.parseFrom(byteEvent);
ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
String serializedEvent = objectSerializer.serialize(event);
DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.parseFrom(serializedEvent.getBytes());
ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
f.get();
} catch (Exception ex) {
return Mono.error(ex);
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> invokeService(K request, Class<T> clazz) {
/**
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz) {
try {
Map<String, String> mapMessage = new HashMap<>();
mapMessage.put("Id", objectSerializer.serialize(appId));
mapMessage.put("Method", objectSerializer.serialize(verb));
mapMessage.put("Data", objectSerializer.serialize(request));
DaprProtos.InvokeServiceEnvelope envelope =
DaprProtos.InvokeServiceEnvelope.parseFrom(objectSerializer.serialize(mapMessage).getBytes());
ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse =
client.invokeService(envelope);
return Mono.just(futureResponse).flatMap(f -> {
try {
String serializedRequest = objectSerializer.serialize(request);
DaprProtos.InvokeServiceEnvelope envelope =
DaprProtos.InvokeServiceEnvelope.parseFrom(serializedRequest.getBytes());
ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse =
client.invokeService(envelope);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toStringUtf8(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
});
return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toStringUtf8(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
return Mono.error(ex);
}
}
});
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> invokeBinding(T request) {
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* Operation not supported for GRPC
* @throws UnsupportedOperationException every time is called.
*/
public <T> Mono<Void> invokeService(String verb, String appId, String method, T request) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> invokeBinding(String name, T request) {
try {
Map<String, String> mapMessage = new HashMap<>();
mapMessage.put("Name", name);
mapMessage.put("Data", objectSerializer.serialize(request));
DaprProtos.InvokeBindingEnvelope envelope =
DaprProtos.InvokeBindingEnvelope.parseFrom(objectSerializer.serialize(mapMessage).getBytes());
ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
String serializedRequest = objectSerializer.serialize(request);
DaprProtos.InvokeBindingEnvelope envelope =
DaprProtos.InvokeBindingEnvelope.parseFrom(serializedRequest.getBytes());
ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
f.get();
} catch (Exception ex) {
return Mono.error(ex);
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> getState(K key, Class<T> clazz) {
/**
* {@inheritDoc}
*/
@Override
public<T, K> Mono<T> getState(StateKeyValue<K> key, StateOptions stateOptions, Class<T> clazz) {
try {
Map<String, String> request = new HashMap<>();
request.put("Key", key.getKey());
request.put("Consistency", stateOptions.getConsistency());
String serializedRequest = objectSerializer.serialize(request);
DaprProtos.GetStateEnvelope envelope = DaprProtos.GetStateEnvelope.parseFrom(serializedRequest.getBytes());
ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope);
return Mono.just(futureResponse).flatMap(f -> {
try {
String serializedRequest = objectSerializer.serialize(key);
DaprProtos.GetStateEnvelope envelope = DaprProtos.GetStateEnvelope.parseFrom(serializedRequest.getBytes());
ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toStringUtf8(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
});
return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toStringUtf8(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
return Mono.error(ex);
}
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> saveState(T state) {
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options) {
try {
List<Map<String, Object>> listStates = new ArrayList<>();
Map<String, Object> mapOptions = transformStateOptionsToMap(options);
for (StateKeyValue state : states) {
Map<String, Object> mapState = transformStateKeyValueToMap(state, mapOptions);
listStates.add(mapState);
};
Map<String, Object> mapStates = new HashMap<>();
mapStates.put("Requests", listStates);
byte[] byteRequests = objectSerializer.serialize(mapStates).getBytes();
DaprProtos.SaveStateEnvelope envelope = DaprProtos.SaveStateEnvelope.parseFrom(byteRequests);
ListenableFuture<Empty> futureEmpty = client.saveState(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
String serializedRequest = objectSerializer.serialize(state);
DaprProtos.SaveStateEnvelope envelope = DaprProtos.SaveStateEnvelope.parseFrom(serializedRequest.getBytes());
ListenableFuture<Empty> futureEmpty = client.saveState(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
f.get();
} catch (Exception ex) {
return Mono.error(ex);
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> deleteState(T key) {
@Override
public <T> Mono<Void> saveState(String key, String etag, T value, StateOptions options) {
StateKeyValue<T> state = new StateKeyValue<>(value, key, etag);
return saveStates(Arrays.asList(state), options);
}
/**
* if stateOptions param is passed it will overrside state.options.
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options) {
try {
Map<String, Object> mapOptions = transformStateOptionsToMap(options);
Map<String, Object> mapState = transformStateKeyValueToMap(state, mapOptions);
String serializedState = objectSerializer.serialize(mapState);
DaprProtos.DeleteStateEnvelope envelope = DaprProtos.DeleteStateEnvelope.parseFrom(serializedState.getBytes());
ListenableFuture<Empty> futureEmpty = client.deleteState(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
String serializedRequest = objectSerializer.serialize(key);
DaprProtos.DeleteStateEnvelope envelope = DaprProtos.DeleteStateEnvelope.parseFrom(serializedRequest.getBytes());
ListenableFuture<Empty> futureEmpty = client.deleteState(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
f.get();
} catch (Exception ex) {
return Mono.error(ex);
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* Operation not supported for GRPC
* @throws UnsupportedOperationException every time is called.
*/
@Override
public Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
@Override
public Mono<String> getActorState(String actorType, String actorId, String keyName) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
@Override
public Mono<Void> saveActorStateTransactionally(String actorType, String actorId, String data) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
@Override
public Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, String data) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
@Override
public Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
@Override
public Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, String data) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
@Override
public Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
}
private Map<String, Object> transformStateOptionsToMap(StateOptions options)
throws IllegalAccessException, IllegalArgumentException {
Map<String, Object> mapOptions = null;
if (options != null) {
mapOptions = new HashMap<>();
for (Field field : options.getClass().getFields()) {
Object fieldValue = field.get(options);
if (fieldValue != null) {
mapOptions.put(field.getName(), fieldValue);
}
}
}
return mapOptions;
}
private Map<String, Object> transformStateKeyValueToMap(StateKeyValue state, Map<String, Object> mapOptions)
throws IllegalAccessException, IllegalArgumentException {
Map<String, Object> mapState = new HashMap<>();
for (Field field : state.getClass().getFields()) {
mapState.put(field.getName(), field.get(state));
}
if (mapOptions != null && !mapOptions.isEmpty()) {
mapState.put("Options", mapOptions);
}
return mapState;
}
}

View File

@ -0,0 +1,388 @@
package io.dapr.client;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.Constants;
import io.dapr.utils.ObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* An adapter for the GRPC Client.
*
* @see io.dapr.client.DaprHttp
* @see io.dapr.client.DaprClient
*/
public class DaprClientHttpAdapter implements DaprClient {
/**
* The HTTP client to be used
*
* @see io.dapr.client.DaprHttp
*/
private DaprHttp client;
/**
* A utitlity class for serialize and deserialize the messages sent and retrived by the client.
*/
private ObjectSerializer objectSerializer;
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param httpClient
* @see io.dapr.client.DaprClientBuilder
*/
DaprClientHttpAdapter(DaprHttp httpClient) {
this.client = client;
objectSerializer = new ObjectSerializer();
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> publishEvent(String topic, T event) {
try {
if (topic == null || topic.trim().isEmpty()) {
throw new DaprException("500", "Name cannot be null or empty.");
}
String serializedEvent = objectSerializer.serialize(event);
StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic);
CompletableFuture<Void> futureVoid = client.invokeAPIVoid(
Constants.defaultHttpMethodSupported.POST.name(), url.toString(), serializedEvent, null);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz) {
try {
if (verb == null || verb.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
Constants.defaultHttpMethodSupported httMethod = Constants.defaultHttpMethodSupported.valueOf(verb.toUpperCase());
if (httMethod == null) {
throw new DaprException("405", "HTTP Method not allowed.");
}
if (appId == null || appId.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
if (method == null || method.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
StringBuilder urlSB = new StringBuilder("/invoke/");
urlSB.append(objectSerializer.serialize(appId));
urlSB.append("/method/");
urlSB.append(objectSerializer.serialize(method));
String serializedRequestBody = objectSerializer.serialize(request);
CompletableFuture<String> futureResponse =
client.invokeAPI(httMethod.name(), urlSB.toString(), serializedRequestBody, null);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> invokeService(String verb, String appId, String method, T request) {
try {
if (verb == null || verb.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
Constants.defaultHttpMethodSupported httMethod = Constants.defaultHttpMethodSupported.valueOf(verb.toUpperCase());
if (httMethod == null) {
throw new DaprException("405", "HTTP Method not allowed.");
}
if (appId == null || appId.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
if (method == null || method.trim().isEmpty()) {
throw new DaprException("500", "Method to invoke cannot be null or empty.");
}
StringBuilder urlSB = new StringBuilder("/invoke/");
urlSB.append(objectSerializer.serialize(appId));
urlSB.append("/method/");
urlSB.append(objectSerializer.serialize(method));
String serializedRequestBody = objectSerializer.serialize(request);
CompletableFuture<Void> futureVoid =
client.invokeAPIVoid(httMethod.name(), urlSB.toString(), serializedRequestBody, null);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> invokeBinding(String name, T request) {
try {
if (name == null || name.trim().isEmpty()) {
throw new DaprException("500", "Name to bind cannot be null or empty.");
}
String serializedBidingRequestBody = objectSerializer.serialize(request);
Map<String, String> jsonMap = new HashMap<>();
jsonMap.put("Data", serializedBidingRequestBody);
StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name);
CompletableFuture<Void> futureVoid = client.invokeAPIVoid(
Constants.defaultHttpMethodSupported.POST.name(), url.toString(), objectSerializer.serialize(jsonMap), null);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> getState(StateKeyValue<K> state, StateOptions options, Class<T> clazz) {
try {
if (state.getKey() == null) {
throw new DaprException("500", "Name cannot be null or empty.");
}
Map<String, String> headers = new HashMap<>();
if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag());
}
String serializedKeyBody = objectSerializer.serialize(state.getKey());
serializedKeyBody += getOptionsAsQueryParameter(options);
if (options.getConsistency() != null && !options.getConsistency().trim().isEmpty()) {
serializedKeyBody += "?consistency=" + objectSerializer.serialize(options.getConsistency());
}
StringBuilder url = new StringBuilder(Constants.STATE_PATH).append("/").append(serializedKeyBody);
CompletableFuture<String> futureResponse =
client.invokeAPI(Constants.defaultHttpMethodSupported.GET.name(), url.toString(), null, headers);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options) {
try {
if (states == null || states.isEmpty()) {
return Mono.empty();
}
Map<String, String> headers = new HashMap<>();
String etag = states.stream().filter(state -> null != state.getEtag() && !state.getEtag().trim().isEmpty())
.findFirst().orElse(new StateKeyValue<>(null, null, null)).getEtag();
if (etag != null && !etag.trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
}
String url = Constants.STATE_PATH + getOptionsAsQueryParameter(options);;
String serializedStateBody = objectSerializer.serialize(states);
CompletableFuture<Void> futureVoid = client.invokeAPIVoid(
Constants.defaultHttpMethodSupported.POST.name(), url, serializedStateBody, headers);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
@Override
public <T> Mono<Void> saveState(String key, String etag, T value, StateOptions options) {
StateKeyValue<T> state = new StateKeyValue<>(value, key, etag);
return saveStates(Arrays.asList(state), options);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options) {
try {
if (state.getKey() == null) {
throw new DaprException("500", "Name cannot be null or empty.");
}
Map<String, String> headers = new HashMap<>();
if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag());
}
String serializedKey = objectSerializer.serialize(state.getKey());
serializedKey += getOptionsAsQueryParameter(options);
String url = Constants.STATE_PATH + "/" + serializedKey;
CompletableFuture<Void> futureVoid = client.invokeAPIVoid(
Constants.defaultHttpMethodSupported.DELETE.name(), url, null, headers);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
@Override
public Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload) {
String url = String.format(Constants.ACTOR_METHOD_RELATIVE_URL_FORMAT, actorType, actorId, methodName);
return actorActionString(Constants.defaultHttpMethodSupported.POST.name(), url, jsonPayload);
}
@Override
public Mono<String> getActorState(String actorType, String actorId, String keyName) {
String url = String.format(Constants.ACTOR_STATE_KEY_RELATIVE_URL_FORMAT, actorType, actorId, keyName);
return actorActionString(Constants.defaultHttpMethodSupported.GET.name(), url, null);
}
@Override
public Mono<Void> saveActorStateTransactionally(String actorType, String actorId, String data) {
String url = String.format(Constants.ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId);
return actorActionVoid(Constants.defaultHttpMethodSupported.PUT.name(), url, data);
}
@Override
public Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, String data) {
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return actorActionVoid(Constants.defaultHttpMethodSupported.PUT.name(), url, data);
}
@Override
public Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName) {
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return actorActionVoid(Constants.defaultHttpMethodSupported.DELETE.name(), url, null);
}
@Override
public Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, String data) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return actorActionVoid(Constants.defaultHttpMethodSupported.PUT.name(), url, data);
}
@Override
public Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return actorActionVoid(Constants.defaultHttpMethodSupported.DELETE.name(), url, null);
}
private Mono<String> actorActionString(String httpVerb, String url, String payload) {
try {
CompletableFuture<String> futureResponse =
client.invokeAPI(httpVerb, url, objectSerializer.serialize(payload), null);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get(), String.class));
} catch (Exception ex) {
return Mono.error(ex);
}
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
private Mono<Void> actorActionVoid(String httpVerb, String url, String payload) {
try {
CompletableFuture<Void> futureVoid =
client.invokeAPIVoid(httpVerb, url, objectSerializer.serialize(payload), null);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
private String getOptionsAsQueryParameter(StateOptions options)
throws IllegalAccessException, IllegalArgumentException, IOException {
StringBuilder sb = new StringBuilder();
Map<String, Object> mapOptions = transformStateOptionsToMap(options);
if (mapOptions != null && !mapOptions.isEmpty()) {
sb.append("?");
for (Map.Entry<String, Object> option : mapOptions.entrySet()) {
sb.append(option.getKey()).append("=").append(objectSerializer.serialize(option.getValue())).append("&");
}
sb.deleteCharAt(sb.length()-1);
}
return sb.toString();
}
private Map<String, Object> transformStateOptionsToMap(StateOptions options)
throws IllegalAccessException, IllegalArgumentException {
Map<String, Object> mapOptions = null;
if (options != null) {
mapOptions = new HashMap<>();
for (Field field : options.getClass().getFields()) {
Object fieldValue = field.get(options);
if (fieldValue != null) {
mapOptions.put(field.getName(), fieldValue);
}
}
}
return mapOptions;
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.Constants;
import io.dapr.utils.ObjectSerializer;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.net.URL;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class DaprHttp {
/**
* ObjectMapper to Serialize data
*/
private static final ObjectSerializer MAPPER = new ObjectSerializer();
/**
* Defines the standard application/json type for HTTP calls in Dapr.
*/
private static final MediaType MEDIA_TYPE_APPLICATION_JSON =
MediaType.get("application/json; charset=utf-8");
/**
* Shared object representing an empty request body in JSON.
*/
private static final RequestBody REQUEST_BODY_EMPTY_JSON =
RequestBody.Companion.create("", MEDIA_TYPE_APPLICATION_JSON);
/**
* JSON Object Mapper.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* The base url used for form urls. This is typically "http://localhost:3500".
*/
private final String baseUrl;
/**
* Http client used for all API calls.
*/
private final OkHttpClient httpClient;
/**
* Thread-pool for HTTP calls.
*/
private final ExecutorService pool;
/**
* Creates a new instance of {@link DaprHttp}.
*
* @param baseUrl Base url calling Dapr (e.g. http://localhost)
* @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String baseUrl, int port, OkHttpClient httpClient) {
this.baseUrl = String.format("%s:%d/", baseUrl, port);
this.httpClient = httpClient;
this.pool = Executors.newWorkStealingPool();
}
/**
* Invokes an API asynchronously that returns Void.
*
* @param method HTTP method.
* @param urlString url as String.
* @param json JSON payload or null.
* @return Asynchronous Void
*/
protected final CompletableFuture<Void> invokeAPIVoid(String method, String urlString, String json, Map<String, String> headers) {
CompletableFuture<String> future = this.invokeAPI(method, urlString, json, headers);
return future.thenAcceptAsync(future::complete);
}
/**
* Invokes an API asynchronously that returns a text payload.
*
* @param method HTTP method.
* @param urlString url as String.
* @param json JSON payload or null.
* @return Asynchronous text
*/
public final CompletableFuture<String> invokeAPI(String method, String urlString, String json, Map<String, String> headers) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(
() -> {
try {
String requestId = UUID.randomUUID().toString();
RequestBody body =
json != null ? RequestBody.Companion.create(json, MEDIA_TYPE_APPLICATION_JSON) : REQUEST_BODY_EMPTY_JSON;
Request.Builder requestBuilder = new Request.Builder()
.url(new URL(this.baseUrl + urlString))
.method(method, body)
.addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId);
Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream().forEach(header ->{
requestBuilder.addHeader(header.getKey(), header.getValue());
});
Request request = requestBuilder.build();
try (Response response = this.httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
DaprError error = parseDaprError(response.body().string());
if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) {
throw new RuntimeException(new DaprException(error));
}
throw new RuntimeException("Unknown error.");
}
String result = response.body().string();
return result == null ? "" : result;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, this.pool);
return future;
}
/**
* Tries to parse an error from Dapr response body.
*
* @param json Response body from Dapr.
* @return DaprError or null if could not parse.
*/
private static DaprError parseDaprError(String json) {
if (json == null) {
return null;
}
try {
return OBJECT_MAPPER.readValue(json, DaprError.class);
} catch (IOException e) {
throw new DaprException("500", "Unknown error: could not parse error json.");
}
}
}

View File

@ -0,0 +1,25 @@
package io.dapr.client.domain;
public class StateKeyValue<T> {
private final T value;
private final String key;
private final String etag;
public StateKeyValue(T value, String key, String etag) {
this.value = value;
this.key = key;
this.etag = etag;
}
public T getValue() {
return value;
}
public String getKey() {
return key;
}
public String getEtag() {
return etag;
}
}

View File

@ -0,0 +1,14 @@
package io.dapr.client.domain;
public class StateOptions {
private final String consistency;
public StateOptions(String consistency) {
this.consistency = consistency;
}
public String getConsistency() {
return consistency;
}
}

View File

@ -9,7 +9,7 @@ import java.io.IOException;
/**
* A Dapr's specific exception.
*/
public class DaprException extends IOException {
public class DaprException extends RuntimeException {
/**
* Dapr's error code for this exception.
@ -25,6 +25,18 @@ public class DaprException extends IOException {
this(daprError.getErrorCode(), daprError.getMessage());
}
/**
*
* @param daprError Client-side error.
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A {@code null} value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
*/
public DaprException(DaprError daprError, Throwable cause) {
this(daprError.getErrorCode(), daprError.getMessage(), cause);
}
/**
* New Exception from a client-side generated error code and message.
*
@ -36,6 +48,20 @@ public class DaprException extends IOException {
this.errorCode = errorCode;
}
/**
*
* @param errorCode Client-side error code.
* @param message Client-side error message.
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A {@code null} value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
*/
public DaprException(String errorCode, String message, Throwable cause) {
super(String.format("%s: %s", errorCode, message), cause);
this.errorCode = errorCode;
}
/**
* Returns the exception's error code.
*

View File

@ -19,11 +19,23 @@ public final class Constants {
*/
public static final String DEFAULT_HOSTNAME = "localhost";
/**
* Dapr's default http protocol.
*/
public static final String DEFAULT_HTTP_PROTOCOL_IDENTIFIED = "http://";
/**
* Dapr's default port.
*/
public static final int DEFAULT_PORT = 3500;
public static enum defaultHttpMethodSupported {
GET,
PUT,
POST,
DELETE;
}
/**
* Environment variable used to set Dapr's port.
*/
@ -34,6 +46,8 @@ public final class Constants {
*/
public static final String HEADER_DAPR_REQUEST_ID = "X-DaprRequestId";
public static final String HEADER_HTTP_ETAG_ID = "If-Match";
/**
* Base URL for Dapr Actor APIs.
*/

View File

@ -5,6 +5,7 @@
package io.dapr.utils;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;

View File

@ -8,47 +8,43 @@ import static org.junit.Assert.*;
public class ActorProxyBuilderTest {
@Test(expected = IllegalArgumentException.class)
public void buildWithNullActorId() {
new ActorProxyBuilder()
.withActorId(null)
.withActorType("test")
.withPort(20)
.build();
@Test(expected = IllegalArgumentException.class)
public void buildWithNullActorId() {
new ActorProxyBuilder()
.withActorId(null)
.withActorType("test")
.build();
}
}
@Test(expected = IllegalArgumentException.class)
public void buildWithEmptyActorType() {
new ActorProxyBuilder()
.withActorId(new ActorId("100"))
.withActorType("")
.withPort(20)
.build();
@Test(expected = IllegalArgumentException.class)
public void buildWithEmptyActorType() {
new ActorProxyBuilder()
.withActorId(new ActorId("100"))
.withActorType("")
.build();
}
}
@Test(expected = IllegalArgumentException.class)
public void buildWithNullActorType() {
new ActorProxyBuilder()
.withActorId(new ActorId("100"))
.withActorType(null)
.withPort(20)
.build();
@Test(expected = IllegalArgumentException.class)
public void buildWithNullActorType() {
new ActorProxyBuilder()
.withActorId(new ActorId("100"))
.withActorType(null)
.build();
}
}
@Test()
public void build() {
ActorProxy actorProxy = new ActorProxyBuilder()
.withActorId(new ActorId("100"))
.withActorType("test")
.withPort(20)
.build();
@Test()
public void build() {
ActorProxyBuilder builder = new ActorProxyBuilder();
builder.withActorId(new ActorId("100"));
builder.withActorType("test");
ActorProxy actorProxy = builder.build();
Assert.assertNotNull(actorProxy);
Assert.assertEquals("test",actorProxy.getActorType());
Assert.assertEquals("100",actorProxy.getActorId().toString());
Assert.assertNotNull(actorProxy);
Assert.assertEquals("test", actorProxy.getActorType());
Assert.assertEquals("100", actorProxy.getActorId().toString());
}
}
}

View File

@ -2,6 +2,7 @@ package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.ActorStateSerializer;
import io.dapr.client.DaprClient;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -13,279 +14,281 @@ import static org.mockito.Mockito.when;
public class ActorProxyImplTest {
@Test()
public void constructorActorProxyTest() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
final ActorStateSerializer serializer = mock(ActorStateSerializer.class);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
serializer,
actorProxyAsyncClient);
Assert.assertEquals(actorProxy.getActorId().toString(), "100");
Assert.assertEquals(actorProxy.getActorType(), "myActorType");
@Test()
public void constructorActorProxyTest() {
final DaprClient daprClient = mock(DaprClient.class);
final ActorStateSerializer serializer = mock(ActorStateSerializer.class);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
serializer,
daprClient);
Assert.assertEquals(actorProxy.getActorId().toString(), "100");
Assert.assertEquals(actorProxy.getActorType(), "myActorType");
}
//@Test()
// TODO: review this test.
public void invokeActorMethodWithoutDataWithReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.just("{\n" +
"\t\"data\": \"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"\n" +
"}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", MyData.class);
MyData myData = result.block();
Assert.assertNotNull(myData);
Assert.assertEquals("valueA", myData.getPropertyA());
Assert.assertEquals("valueB", myData.getPropertyB());// propertyB=null
}
@Test()
public void invokeActorMethodWithoutDataWithEmptyReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.just(""));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", MyData.class);
MyData myData = result.block();
Assert.assertNull(myData);
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodWithIncorrectReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.just("{test}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", MyData.class);
result.doOnSuccess(x ->
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
}
//@Test()
// TODO: review this test.
public void invokeActorMethodSavingDataWithReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just("{\n" +
"\t\"data\": \"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"\n" +
"}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
MyData myData = result.block();
Assert.assertNotNull(myData);
Assert.assertEquals("valueA", myData.getPropertyA());
Assert.assertEquals("valueB", myData.getPropertyB());//propertyB=null
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodSavingDataWithIncorrectReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just("{test}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
result.doOnSuccess(x ->
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
}
@Test()
public void invokeActorMethodSavingDataWithEmptyReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just(""));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
MyData myData = result.block();
Assert.assertNull(myData);
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodSavingDataWithIncorrectInputType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just("{test}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
saveData.setMyData(saveData);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
result.doOnSuccess(x ->
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
}
@Test()
public void invokeActorMethodWithDataWithVoidReturnType() {
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.empty());
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
Mono<Void> result = actorProxy.invokeActorMethod("getData", saveData);
Void emptyResponse = result.block();
Assert.assertNull(emptyResponse);
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodWithDataWithVoidIncorrectInputType() {
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
saveData.setMyData(saveData);
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.empty());
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
Mono<Void> result = actorProxy.invokeActorMethod("getData", saveData);
Void emptyResponse = result.doOnError(Throwable::printStackTrace).block();
Assert.assertNull(emptyResponse);
}
@Test()
public void invokeActorMethodWithoutDataWithVoidReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.empty());
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
daprClient);
Mono<Void> result = actorProxy.invokeActorMethod("getData");
Void emptyResponse = result.block();
Assert.assertNull(emptyResponse);
}
static class MyData {
/// Gets or sets the value for PropertyA.
private String propertyA;
/// Gets or sets the value for PropertyB.
private String propertyB;
private MyData myData;
public String getPropertyB() {
return propertyB;
}
@Test()
public void invokeActorMethodWithoutDataWithReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.just("{\n" +
"\t\"data\": \"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"\n" +
"}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", MyData.class);
MyData myData = result.block();
Assert.assertNotNull(myData);
Assert.assertEquals("valueA", myData.getPropertyA());
Assert.assertEquals("valueB", myData.getPropertyB());
public void setPropertyB(String propActorProxyBuilderTestertyB) {
this.propertyB = propertyB;
}
@Test()
public void invokeActorMethodWithoutDataWithEmptyReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.just(""));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", MyData.class);
MyData myData = result.block();
Assert.assertNull(myData);
public String getPropertyA() {
return propertyA;
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodWithIncorrectReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.just("{test}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", MyData.class);
result.doOnSuccess(x ->
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
public void setPropertyA(String propertyA) {
this.propertyA = propertyA;
}
@Test()
public void invokeActorMethodSavingDataWithReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just("{\n" +
"\t\"data\": \"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"\n" +
"}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
MyData myData = result.block();
Assert.assertNotNull(myData);
Assert.assertEquals("valueA", myData.getPropertyA());
Assert.assertEquals("valueB", myData.getPropertyB());
@Override
public String toString() {
return "MyData{" +
"propertyA='" + propertyA + '\'' +
", propertyB='" + propertyB + '\'' +
'}';
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodSavingDataWithIncorrectReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just("{test}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
result.doOnSuccess(x ->
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
public MyData getMyData() {
return myData;
}
@Test()
public void invokeActorMethodSavingDataWithEmptyReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just(""));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
MyData myData = result.block();
Assert.assertNull(myData);
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodSavingDataWithIncorrectInputType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.just("{test}"));
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
saveData.setMyData(saveData);
Mono<MyData> result = actorProxy.invokeActorMethod("getData", saveData, MyData.class);
result.doOnSuccess(x ->
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
}
@Test()
public void invokeActorMethodWithDataWithVoidReturnType() {
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.empty());
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
Mono<Void> result = actorProxy.invokeActorMethod("getData", saveData);
Void emptyResponse = result.block();
Assert.assertNull(emptyResponse);
}
@Test(expected = RuntimeException.class)
public void invokeActorMethodWithDataWithVoidIncorrectInputType() {
MyData saveData = new MyData();
saveData.setPropertyA("valueA");
saveData.setPropertyB("valueB");
saveData.setMyData(saveData);
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNotNull()))
.thenReturn(Mono.empty());
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
Mono<Void> result = actorProxy.invokeActorMethod("getData", saveData);
Void emptyResponse = result.doOnError(Throwable::printStackTrace).block();
Assert.assertNull(emptyResponse);
}
@Test()
public void invokeActorMethodWithoutDataWithVoidReturnType() {
final ActorProxyHttpAsyncClient actorProxyAsyncClient = mock(ActorProxyHttpAsyncClient.class);
when(actorProxyAsyncClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(Mono.empty());
final ActorProxy actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new ActorStateSerializer(),
actorProxyAsyncClient);
Mono<Void> result = actorProxy.invokeActorMethod("getData");
Void emptyResponse = result.block();
Assert.assertNull(emptyResponse);
}
static class MyData {
/// Gets or sets the value for PropertyA.
private String propertyA;
/// Gets or sets the value for PropertyB.
private String propertyB;
private MyData myData;
public String getPropertyB() {
return propertyB;
}
public void setPropertyB(String propertyB) {
this.propertyB = propertyB;
}
public String getPropertyA() {
return propertyA;
}
public void setPropertyA(String propertyA) {
this.propertyA = propertyA;
}
@Override
public String toString() {
return "MyData{" +
"propertyA='" + propertyA + '\'' +
", propertyB='" + propertyB + '\'' +
'}';
}
public MyData getMyData() {
return myData;
}
public void setMyData(MyData myData) {
this.myData = myData;
}
public void setMyData(MyData myData) {
this.myData = myData;
}
}
}

View File

@ -4,6 +4,8 @@
*/
package io.dapr.actors.client;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.exceptions.DaprException;
import org.junit.Assert;
import org.junit.Test;
@ -21,8 +23,8 @@ public class DaprHttpAsyncClientIT {
*/
@Test(expected = RuntimeException.class)
public void invokeUnknownActor() {
ActorProxyAsyncClient daprAsyncClient = new ActorProxyClientBuilder().buildAsyncClient();
daprAsyncClient
DaprClient daprClient = new DaprClientBuilder().build();
daprClient
.invokeActorMethod("ActorThatDoesNotExist", "100", "GetData", null)
.doOnError(x -> {
Assert.assertTrue(x instanceof RuntimeException);

View File

@ -6,6 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprClient;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
@ -193,7 +194,7 @@ public class ActorManagerTest {
new ActorStateSerializer(),
new DefaultActorFactory<T>(),
ActorTypeInformation.create(clazz),
mock(AppToDaprAsyncClient.class),
mock(DaprClient.class),
mock(DaprStateAsyncProvider.class)
);
}

View File

@ -8,6 +8,7 @@ package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprClient;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
@ -74,9 +75,9 @@ public class DaprStateAsyncProviderTest {
@Test
public void happyCaseApply() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
when(daprAsyncClient
.saveStateTransactionally(
DaprClient daprClient = mock(DaprClient.class);
when(daprClient
.saveActorStateTransactionally(
eq("MyActor"),
eq("123"),
argThat(s -> {
@ -124,7 +125,7 @@ public class DaprStateAsyncProviderTest {
})))
.thenReturn(Mono.empty());
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
provider.apply("MyActor",
new ActorId("123"),
createInsertChange("name", "Jon Doe"),
@ -132,38 +133,38 @@ public class DaprStateAsyncProviderTest {
createDeleteChange("flag"))
.block();
verify(daprAsyncClient).saveStateTransactionally(eq("MyActor"), eq("123"), any());
verify(daprClient).saveActorStateTransactionally(eq("MyActor"), eq("123"), any());
}
@Test
public void happyCaseLoad() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
when(daprAsyncClient
.getState(any(), any(), eq("name")))
DaprClient daprClient = mock(DaprClient.class);
when(daprClient
.getActorState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe"));
when(daprAsyncClient
.getState(any(), any(), eq("zipcode")))
when(daprClient
.getActorState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021"));
when(daprAsyncClient
.getState(any(), any(), eq("goals")))
when(daprClient
.getActorState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98"));
when(daprAsyncClient
.getState(any(), any(), eq("balance")))
when(daprClient
.getActorState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55"));
when(daprAsyncClient
.getState(any(), any(), eq("active")))
when(daprClient
.getActorState(any(), any(), eq("active")))
.thenReturn(Mono.just("true"));
when(daprAsyncClient
.getState(any(), any(), eq("customer")))
when(daprClient
.getActorState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}"));
when(daprAsyncClient
.getState(any(), any(), eq("anotherCustomer")))
when(daprClient
.getActorState(any(), any(), eq("anotherCustomer")))
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}"));
when(daprAsyncClient
.getState(any(), any(), eq("nullCustomer")))
when(daprClient
.getActorState(any(), any(), eq("nullCustomer")))
.thenReturn(Mono.just(""));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
Assert.assertEquals("Jon Doe",
provider.load("MyActor", new ActorId("123"), "name", String.class).block());
@ -187,40 +188,40 @@ public class DaprStateAsyncProviderTest {
@Test
public void happyCaseContains() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
DaprClient daprClient = mock(DaprClient.class);
// Keys that exists.
when(daprAsyncClient
.getState(any(), any(), eq("name")))
when(daprClient
.getActorState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe"));
when(daprAsyncClient
.getState(any(), any(), eq("zipcode")))
when(daprClient
.getActorState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021"));
when(daprAsyncClient
.getState(any(), any(), eq("goals")))
when(daprClient
.getActorState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98"));
when(daprAsyncClient
.getState(any(), any(), eq("balance")))
when(daprClient
.getActorState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55"));
when(daprAsyncClient
.getState(any(), any(), eq("active")))
when(daprClient
.getActorState(any(), any(), eq("active")))
.thenReturn(Mono.just("true"));
when(daprAsyncClient
.getState(any(), any(), eq("customer")))
when(daprClient
.getActorState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }"));
// Keys that do not exist.
when(daprAsyncClient
.getState(any(), any(), eq("Does not exist")))
when(daprClient
.getActorState(any(), any(), eq("Does not exist")))
.thenReturn(Mono.just(""));
when(daprAsyncClient
.getState(any(), any(), eq("NAME")))
when(daprClient
.getActorState(any(), any(), eq("NAME")))
.thenReturn(Mono.just(""));
when(daprAsyncClient
.getState(any(), any(), eq(null)))
when(daprClient
.getActorState(any(), any(), eq(null)))
.thenReturn(Mono.just(""));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "name").block());
Assert.assertFalse(provider.contains("MyActor", new ActorId("123"), "NAME").block());

View File

@ -6,6 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprClient;
import org.junit.Assert;
import org.junit.Test;
@ -74,7 +75,7 @@ public class DefaultActorFactoryTest {
mock(ActorStateSerializer.class),
mock(ActorFactory.class),
ActorTypeInformation.create(clazz),
mock(AppToDaprAsyncClient.class),
mock(DaprClient.class),
mock(DaprStateAsyncProvider.class));
}

View File

@ -0,0 +1,520 @@
package io.dapr.utils;
import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
public class ObjectSerializerTest {
public static class MyObjectTestToSerialize implements Serializable {
private String stringValue;
private int intValue;
private boolean boolValue;
private char charValue;
private byte byteValue;
private short shortValue;
private long longValue;
private float floatValue;
private double doubleValue;
public String getStringValue() {
return stringValue;
}
public void setStringValue(String stringValue) {
this.stringValue = stringValue;
}
public int getIntValue() {
return intValue;
}
public void setIntValue(int intValue) {
this.intValue = intValue;
}
public boolean isBoolValue() {
return boolValue;
}
public void setBoolValue(boolean boolValue) {
this.boolValue = boolValue;
}
public char getCharValue() {
return charValue;
}
public void setCharValue(char charValue) {
this.charValue = charValue;
}
public byte getByteValue() {
return byteValue;
}
public void setByteValue(byte byteValue) {
this.byteValue = byteValue;
}
public short getShortValue() {
return shortValue;
}
public void setShortValue(short shortValue) {
this.shortValue = shortValue;
}
public long getLongValue() {
return longValue;
}
public void setLongValue(long longValue) {
this.longValue = longValue;
}
public float getFloatValue() {
return floatValue;
}
public void setFloatValue(float floatValue) {
this.floatValue = floatValue;
}
public double getDoubleValue() {
return doubleValue;
}
public void setDoubleValue(double doubleValue) {
this.doubleValue = doubleValue;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MyObjectTestToSerialize)) {
return false;
}
MyObjectTestToSerialize that = (MyObjectTestToSerialize) o;
if (getIntValue() != that.getIntValue()) {
return false;
}
if (isBoolValue() != that.isBoolValue()) {
return false;
}
if (getCharValue() != that.getCharValue()) {
return false;
}
if (getByteValue() != that.getByteValue()) {
return false;
}
if (getShortValue() != that.getShortValue()) {
return false;
}
if (getLongValue() != that.getLongValue()) {
return false;
}
if (Float.compare(that.getFloatValue(), getFloatValue()) != 0) {
return false;
}
if (Double.compare(that.getDoubleValue(), getDoubleValue()) != 0) {
return false;
}
if (getStringValue() != null ? !getStringValue().equals(that.getStringValue()) : that.getStringValue() != null) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result;
long temp;
result = getStringValue() != null ? getStringValue().hashCode() : 0;
result = 31 * result + getIntValue();
result = 31 * result + (isBoolValue() ? 1 : 0);
result = 31 * result + (int) getCharValue();
result = 31 * result + (int) getByteValue();
result = 31 * result + (int) getShortValue();
result = 31 * result + (int) (getLongValue() ^ (getLongValue() >>> 32));
result = 31 * result + (getFloatValue() != +0.0f ? Float.floatToIntBits(getFloatValue()) : 0);
temp = Double.doubleToLongBits(getDoubleValue());
result = 31 * result + (int) (temp ^ (temp >>> 32));
return result;
}
@Override
public String toString() {
return "MyObjectTestToSerialize{" +
"stringValue='" + stringValue + '\'' +
", intValue=" + intValue +
", boolValue=" + boolValue +
", charValue=" + charValue +
", byteValue=" + byteValue +
", shortValue=" + shortValue +
", longValue=" + longValue +
", floatValue=" + floatValue +
", doubleValue=" + doubleValue +
'}';
}
}
@Test
public void serializeObjectTest() {
MyObjectTestToSerialize obj = new MyObjectTestToSerialize();
obj.setStringValue("A String");
obj.setIntValue(2147483647);
obj.setBoolValue(true);
obj.setCharValue('a');
obj.setByteValue((byte)65);
obj.setShortValue((short) 32767);
obj.setLongValue(9223372036854775807L);
obj.setFloatValue(1.0f);
obj.setDoubleValue(1000.0);
String expectedResult = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
ObjectSerializer serializer = new ObjectSerializer();
String serializedValue;
try {
serializedValue = serializer.serialize(obj);
assertEquals("FOUND:[[" + serializedValue + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, serializedValue);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void serializeNullTest() {
ObjectSerializer serializer = new ObjectSerializer();
String serializedValue;
try {
serializedValue = serializer.serialize(null);
assertNull("The expected result is null", serializedValue);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void serializeStringTest() {
String valueToSerialize = "A String";
ObjectSerializer serializer = new ObjectSerializer();
String serializedValue;
try {
serializedValue = serializer.serialize(valueToSerialize);
assertEquals(valueToSerialize, serializedValue);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void serializeIntTest() {
Integer valueToSerialize = 1;
String expectedResult = valueToSerialize.toString();
ObjectSerializer serializer = new ObjectSerializer();
String serializedValue;
try {
serializedValue = serializer.serialize(valueToSerialize.intValue());
assertEquals(expectedResult, serializedValue);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("The expected value is different than the actual result", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
public void deserializeNullObjectOrPrimitiveTest() {
ObjectSerializer serializer = new ObjectSerializer();
try {
MyObjectTestToSerialize expectedObj = null;
MyObjectTestToSerialize objResult = serializer.deserialize(null, MyObjectTestToSerialize.class);
assertEquals(expectedObj, objResult);
boolean expectedBoolResutl = false;
boolean boolResult = serializer.deserialize(null, boolean.class);
assertEquals(expectedBoolResutl, boolResult);
byte expectedByteResult = Byte.valueOf((byte) 0);
byte byteResult = serializer.deserialize(null, byte.class);
assertEquals(expectedByteResult, byteResult);
short expectedShortResult = (short) 0;
short shortResult = serializer.deserialize(null, short.class);
assertEquals(expectedShortResult, shortResult);
int expectedIntResult = 0;
int intResult = serializer.deserialize(null, int.class);
assertEquals(expectedIntResult, intResult);
long expectedLongResult = 0L;
long longResult = serializer.deserialize(null, long.class);
assertEquals(expectedLongResult, longResult);
float expectedFloatResult = 0f;
float floatResult = serializer.deserialize(null, float.class);
assertEquals(expectedFloatResult, floatResult);
double expectedDoubleResult = (double) 0;
double doubleResult = serializer.deserialize(null, double.class);
assertEquals(expectedDoubleResult, doubleResult);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingStringPropertyTest() {
String jsonToDeserialize = "{\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingIntTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingBooleanTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingCharTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingByteTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingShortTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingLongTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"floatValue\":1.0,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setFloatValue(1.0f);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingFloatTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"doubleValue\":1000.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setDoubleValue(1000.0);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test
public void deserializeObjectMissingDoubleTest() {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0}";
MyObjectTestToSerialize expectedResult = new MyObjectTestToSerialize();
expectedResult.setStringValue("A String");
expectedResult.setIntValue(2147483647);
expectedResult.setBoolValue(true);
expectedResult.setCharValue('a');
expectedResult.setByteValue((byte)65);
expectedResult.setShortValue((short) 32767);
expectedResult.setLongValue(9223372036854775807L);
expectedResult.setFloatValue(1.0f);
MyObjectTestToSerialize result;
ObjectSerializer serializer = new ObjectSerializer();
try {
result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
assertEquals("FOUND:[[" + result + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, result);
} catch (IOException exception) {
fail(exception.getMessage());
}
}
@Test(expected = IOException.class)
public void deserializeObjectIntExceedMaximunValueTest() throws Exception {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483648,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
ObjectSerializer serializer = new ObjectSerializer();
serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
}
@Test(expected = IOException.class)
public void deserializeObjectNotACharTest() throws Exception {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"Not A Char\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
ObjectSerializer serializer = new ObjectSerializer();
try {
serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
} catch (IOException ioEx) {
throw ioEx;
} catch (Exception ex) {
fail("Wrong exception thrown: [" + ex.getClass() + "] Message:[" + ex.getMessage() + "]");
}
}
@Test(expected = IOException.class)
public void deserializeObjectShortExceededMaximunValueTest() throws Exception {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32768,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
ObjectSerializer serializer = new ObjectSerializer();
serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
}
@Test(expected = IOException.class)
public void deserializeObjectLongExceededMaximumValueTest() throws Exception {
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775808,\"floatValue\":1.0,\"doubleValue\":1000.0}";
ObjectSerializer serializer = new ObjectSerializer();
MyObjectTestToSerialize result = serializer.deserialize(jsonToDeserialize, MyObjectTestToSerialize.class);
}
}