mirror of https://github.com/dapr/java-sdk.git
ActorProxy + some refactoring (#62)
* ActorManager * More work done. * Adding example for actor runtime service. * Implements ActorStateManager + fixes + javadocs. * Fix OrderManager example in order to process http 201 Change the implement the calls to DAPR using the class AbstractDaprClient because before the change the class always return Mono.Empty Implementation of the ActorAsyncProxy Change the name of the Actor Dapr Http Async Client * Update code with the changes proposed by Artur in the code review * Changes to support ActorProxy + Fixes. Co-authored-by: Juan Jose Herrera <35985447+JuanJose-Herrera@users.noreply.github.com>
This commit is contained in:
parent
64bb4bfb3e
commit
0ef261554a
|
@ -1,16 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.actors.http;
|
||||
|
||||
/**
|
||||
* Client that will use Actor.
|
||||
*/
|
||||
public class ActorClient {
|
||||
// TODO.
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.actors.http;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.client.ActorProxy;
|
||||
import io.dapr.actors.client.ActorProxyBuilder;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Client for Actor runtime.
|
||||
* 1. Build and install jars:
|
||||
* mvn clean install
|
||||
* 2. Run the client:
|
||||
* dapr run --app-id demoactorclient --port 3006 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorClient
|
||||
*/
|
||||
public class DemoActorClient {
|
||||
|
||||
private static final int NUM_ACTORS = 3;
|
||||
|
||||
private static final int NUM_MESSAGES_PER_ACTOR = 10;
|
||||
|
||||
private static final String METHOD_NAME = "say";
|
||||
|
||||
private static final ExecutorService POOL = Executors.newFixedThreadPool(NUM_ACTORS);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ActorProxyBuilder builder = new ActorProxyBuilder();
|
||||
|
||||
List<CompletableFuture<Void>> futures = new ArrayList<>(NUM_ACTORS);
|
||||
|
||||
for (int i = 0; i < NUM_ACTORS; i++) {
|
||||
ActorProxy actor = builder.withActorType("DemoActor").withActorId(ActorId.createRandom()).build();
|
||||
futures.add(callActorNTimes(actor));
|
||||
}
|
||||
|
||||
futures.forEach(CompletableFuture::join);
|
||||
POOL.shutdown();
|
||||
POOL.awaitTermination(1, TimeUnit.MINUTES);
|
||||
|
||||
System.out.println("Done.");
|
||||
}
|
||||
|
||||
private static final CompletableFuture<Void> callActorNTimes(ActorProxy actor) {
|
||||
return CompletableFuture.runAsync(() -> {
|
||||
for (int i = 0; i < NUM_MESSAGES_PER_ACTOR; i++) {
|
||||
String result = actor.invokeActorMethod(METHOD_NAME,
|
||||
String.format("Actor %s said message #%d", actor.getActorId().toString(), i)).block();
|
||||
System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result));
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}, POOL);
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ import io.dapr.actors.ActorId;
|
|||
import io.dapr.actors.runtime.AbstractActor;
|
||||
import io.dapr.actors.runtime.Actor;
|
||||
import io.dapr.actors.runtime.ActorRuntimeContext;
|
||||
import io.dapr.actors.runtime.ActorType;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
|
@ -18,6 +19,7 @@ import java.util.TimeZone;
|
|||
/**
|
||||
* Implementation of the DemoActor for the server side.
|
||||
*/
|
||||
@ActorType(Name = "DemoActor")
|
||||
public class DemoActorImpl extends AbstractActor implements DemoActor, Actor {
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,8 +32,8 @@ import java.util.Map;
|
|||
* Service for Actor runtime.
|
||||
* 1. Build and install jars:
|
||||
* mvn clean install
|
||||
* 2. Run in server mode:
|
||||
* dapr run --app-id hellogrpc --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorService -Dexec.args="-p 3000"
|
||||
* 2. Run the server:
|
||||
* dapr run --app-id demoactorservice --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorService -Dexec.args="-p 3000"
|
||||
*/
|
||||
public class DemoActorService {
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.net.http.HttpResponse;
|
|||
import java.net.http.HttpResponse.BodyHandlers;
|
||||
import java.nio.charset.Charset;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
@ -56,7 +58,7 @@ public class OrderManager {
|
|||
out.println("Fetching order!");
|
||||
fetch(stateUrl + "/order").thenAccept(response -> {
|
||||
int resCode = response.statusCode() == 200 ? 200 : 500;
|
||||
String body = response.statusCode() == 200 ? response.body() : "Could not get state.";
|
||||
String body = (response.statusCode() == 200) || (response.statusCode() == 201) ? response.body() : "Could not get state.";
|
||||
|
||||
try {
|
||||
e.sendResponseHeaders(resCode, body.getBytes().length);
|
||||
|
@ -89,7 +91,7 @@ public class OrderManager {
|
|||
out.printf("Writing to state: %s\n", state.toString());
|
||||
|
||||
post(stateUrl, state.toString()).thenAccept(response -> {
|
||||
int resCode = response.statusCode() == 200 ? 200 : 500;
|
||||
int resCode = (response.statusCode() == 200) || (response.statusCode() == 201) ? 201 : 500;
|
||||
String body = response.body();
|
||||
try {
|
||||
e.sendResponseHeaders(resCode, body.getBytes().length);
|
||||
|
|
|
@ -25,12 +25,20 @@ public abstract class AbstractClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns configured port.
|
||||
* @return
|
||||
*/
|
||||
protected int getPort() {
|
||||
return this.port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to get a valid port from environment variable or returns default.
|
||||
*
|
||||
* @return Port defined in env variable or default.
|
||||
*/
|
||||
protected static int GetEnvPortOrDefault() {
|
||||
private static int GetEnvPortOrDefault() {
|
||||
String envPort = System.getenv(Constants.ENV_DAPR_HTTP_PORT);
|
||||
if (envPort == null) {
|
||||
return Constants.DEFAULT_PORT;
|
||||
|
@ -44,4 +52,5 @@ public abstract class AbstractClientBuilder {
|
|||
|
||||
return Constants.DEFAULT_PORT;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,16 +5,13 @@
|
|||
package io.dapr.actors;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import okhttp3.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.UUID;
|
||||
import okhttp3.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Base for Dapr HTTP Client.
|
||||
*/
|
||||
// base class of hierarchy
|
||||
public abstract class AbstractDaprClient {
|
||||
|
||||
/**
|
||||
|
@ -109,10 +106,10 @@ public abstract class AbstractDaprClient {
|
|||
RequestBody body = json != null ? RequestBody.create(MEDIA_TYPE_APPLICATION_JSON, json) : REQUEST_BODY_EMPTY_JSON;
|
||||
|
||||
Request request = new Request.Builder()
|
||||
.url(new URL(this.baseUrl + urlString))
|
||||
.method(method, body)
|
||||
.addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId)
|
||||
.build();
|
||||
.url(new URL(this.baseUrl + urlString))
|
||||
.method(method, body)
|
||||
.addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId)
|
||||
.build();
|
||||
|
||||
this.httpClient.newCall(request).enqueue(new Callback() {
|
||||
|
||||
|
@ -181,4 +178,4 @@ public abstract class AbstractDaprClient {
|
|||
public void onSuccess(String response);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.actors.client;
|
||||
|
||||
/**
|
||||
* Request and Response object used to talk to Actors.
|
||||
*/
|
||||
public class ActorMethodEnvelope {
|
||||
|
||||
/**
|
||||
* Data serialized for input/output of Actor methods.
|
||||
*/
|
||||
private byte[] data;
|
||||
|
||||
/**
|
||||
* Gets the data serialized for input/output of Actor methods.
|
||||
* @return Data serialized for input/output of Actor methods.
|
||||
*/
|
||||
public byte[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the data serialized for input/output of Actor methods.
|
||||
* @param data Data serialized for input/output of Actor methods.
|
||||
*/
|
||||
public void setData(byte[] data) {
|
||||
this.data = data;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package io.dapr.actors.client;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Proxy to communicate to a given Actor instance in Dapr.
|
||||
*/
|
||||
public interface ActorProxy {
|
||||
|
||||
/**
|
||||
* Returns the ActorId associated with the proxy object.
|
||||
*
|
||||
* @return An ActorId object.
|
||||
*/
|
||||
ActorId getActorId();
|
||||
|
||||
/**
|
||||
* Returns actor implementation type of the actor associated with the proxy object.
|
||||
*
|
||||
* @return Actor's type name.
|
||||
*/
|
||||
String getActorType();
|
||||
|
||||
/**
|
||||
* Invokes an Actor method on Dapr.
|
||||
*
|
||||
* @param methodName Method name to invoke.
|
||||
* @param clazz The type of the return class.
|
||||
* @return Asynchronous result with the Actor's response.
|
||||
*/
|
||||
<T> Mono<T> invokeActorMethod(String methodName, Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Invokes an Actor method on Dapr.
|
||||
*
|
||||
* @param methodName Method name to invoke.
|
||||
* @param data Object with the data.
|
||||
* @param clazz The type of the return class.
|
||||
* @return Asynchronous result with the Actor's response.
|
||||
*/
|
||||
<T> Mono<T> invokeActorMethod(String methodName, Object data, Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Invokes an Actor method on Dapr.
|
||||
*
|
||||
* @param methodName Method name to invoke.
|
||||
* @return Asynchronous result with the Actor's response.
|
||||
*/
|
||||
Mono<String> invokeActorMethod(String methodName);
|
||||
|
||||
/**
|
||||
* Invokes an Actor method on Dapr.
|
||||
*
|
||||
* @param methodName Method name to invoke.
|
||||
* @param data Object with the data.
|
||||
* @return Asynchronous result with the Actor's response.
|
||||
*/
|
||||
Mono<String> invokeActorMethod(String methodName, Object data);
|
||||
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
package io.dapr.actors.client;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.runtime.ActorStateSerializer;
|
||||
import io.dapr.actors.utils.ObjectSerializer;
|
||||
|
||||
/**
|
||||
* Builder to generate an ActorProxy instance.
|
||||
*/
|
||||
public class ActorProxyBuilder {
|
||||
|
||||
/**
|
||||
* Serializer for content to be sent back and forth between actors.
|
||||
*/
|
||||
private static final ObjectSerializer SERIALIZER = new ActorStateSerializer();
|
||||
|
||||
/**
|
||||
* Builder for the Dapr client.
|
||||
*/
|
||||
private final ActorProxyClientBuilder clientBuilder = new ActorProxyClientBuilder();
|
||||
|
||||
/**
|
||||
* Actor's type.
|
||||
*/
|
||||
private String actorType;
|
||||
|
||||
/**
|
||||
* Actor's identifier.
|
||||
*/
|
||||
private ActorId actorId;
|
||||
|
||||
/**
|
||||
* Changes build config to use specific port.
|
||||
*
|
||||
* @param port Port to be used.
|
||||
* @return Same builder object.
|
||||
*/
|
||||
public ActorProxyBuilder withPort(int port) {
|
||||
this.clientBuilder.withPort(port);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Changes build config to use given Actor's type.
|
||||
*
|
||||
* @param actorType Actor's type.
|
||||
* @return Same builder object.
|
||||
*/
|
||||
public ActorProxyBuilder withActorType(String actorType) {
|
||||
this.actorType = actorType;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Changes build config to use given Actor's identifier.
|
||||
*
|
||||
* @param actorId Actor's identifier.
|
||||
* @return Same builder object.
|
||||
*/
|
||||
public ActorProxyBuilder withActorId(ActorId actorId) {
|
||||
this.actorId = actorId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new ActorProxy.
|
||||
*
|
||||
* @return New instance of ActorProxy.
|
||||
*/
|
||||
public ActorProxy build() {
|
||||
if ((this.actorType == null) || this.actorType.isEmpty()) {
|
||||
throw new IllegalArgumentException("Cannot instantiate an Actor without type.");
|
||||
}
|
||||
|
||||
if (this.actorId == null) {
|
||||
throw new IllegalArgumentException("Cannot instantiate an Actor without Id.");
|
||||
}
|
||||
|
||||
return new ActorProxyImpl(
|
||||
this.actorType,
|
||||
this.actorId,
|
||||
SERIALIZER,
|
||||
this.clientBuilder.buildAsyncClient());
|
||||
}
|
||||
|
||||
}
|
|
@ -12,11 +12,6 @@ import okhttp3.OkHttpClient;
|
|||
*/
|
||||
class ActorProxyClientBuilder extends AbstractClientBuilder {
|
||||
|
||||
/**
|
||||
* Default port for Dapr after checking environment variable.
|
||||
*/
|
||||
private int port = ActorProxyClientBuilder.GetEnvPortOrDefault();
|
||||
|
||||
/**
|
||||
* Builds an async client.
|
||||
*
|
||||
|
@ -25,6 +20,6 @@ class ActorProxyClientBuilder extends AbstractClientBuilder {
|
|||
public ActorProxyAsyncClient buildAsyncClient() {
|
||||
OkHttpClient.Builder builder = new OkHttpClient.Builder();
|
||||
// TODO: Expose configurations for OkHttpClient or com.microsoft.rest.RestClient.
|
||||
return new ActorProxyHttpAsyncClient(this.port, builder.build());
|
||||
return new ActorProxyHttpAsyncClient(super.getPort(), builder.build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ class ActorProxyHttpAsyncClient extends AbstractDaprClient implements ActorProxy
|
|||
* @param port Port for calling Dapr. (e.g. 3500)
|
||||
* @param httpClient RestClient used for all API calls in this new instance.
|
||||
*/
|
||||
public ActorProxyHttpAsyncClient(int port, OkHttpClient httpClient) {
|
||||
ActorProxyHttpAsyncClient(int port, OkHttpClient httpClient) {
|
||||
super(port, httpClient);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
package io.dapr.actors.client;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.utils.ObjectSerializer;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* Implements a proxy client for an Actor's instance.
|
||||
*/
|
||||
class ActorProxyImpl implements ActorProxy {
|
||||
|
||||
/**
|
||||
* EMPTY data for null response.
|
||||
*/
|
||||
private static final byte[] EMPTY_DATA = new byte[0];
|
||||
|
||||
/**
|
||||
* Actor's identifier for this Actor instance.
|
||||
*/
|
||||
private final ActorId actorId;
|
||||
|
||||
/**
|
||||
* Actor's type for this Actor instance.
|
||||
*/
|
||||
private final String actorType;
|
||||
|
||||
/**
|
||||
* Serializer/deserialzier to exchange message for Actors.
|
||||
*/
|
||||
private final ObjectSerializer serializer;
|
||||
|
||||
/**
|
||||
* Client to talk to the Dapr's API.
|
||||
*/
|
||||
private final ActorProxyAsyncClient daprClient;
|
||||
|
||||
/**
|
||||
* Creates a new instance of {@link ActorProxyAsyncClient}.
|
||||
*
|
||||
* @param actorType actor implementation type of the actor associated with the proxy object.
|
||||
* @param actorId The actorId associated with the proxy
|
||||
* @param serializer Serializer and deserializer for method calls.
|
||||
* @param daprClient Dapr client.
|
||||
*/
|
||||
ActorProxyImpl(String actorType, ActorId actorId, ObjectSerializer serializer, ActorProxyAsyncClient daprClient) {
|
||||
this.actorType = actorType;
|
||||
this.actorId = actorId;
|
||||
this.daprClient = daprClient;
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public ActorId getActorId() {
|
||||
return actorId;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public String getActorType() {
|
||||
return actorType;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> invokeActorMethod(String methodName, Object data, Class<T> clazz) {
|
||||
try {
|
||||
Mono<String> result = this.daprClient.invokeActorMethod(
|
||||
actorType,
|
||||
actorId.toString(),
|
||||
methodName,
|
||||
this.wrap(data));
|
||||
|
||||
return result
|
||||
.filter(s -> (s != null) && (!s.isEmpty()))
|
||||
.map(s -> unwrap(s, clazz));
|
||||
} catch (IOException e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> invokeActorMethod(String methodName, Class<T> clazz) {
|
||||
Mono<String> result = this.daprClient.invokeActorMethod(actorType, actorId.toString(), methodName, null);
|
||||
return result
|
||||
.filter(s -> (s != null) && (!s.isEmpty()))
|
||||
.map(s -> unwrap(s, clazz));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<String> invokeActorMethod(String methodName) {
|
||||
Mono<String> result = this.daprClient.invokeActorMethod(actorType, actorId.toString(), methodName, null);
|
||||
return result
|
||||
.filter(s -> (s != null) && (!s.isEmpty()))
|
||||
.map(s -> this.unwrap(s, String.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<String> invokeActorMethod(String methodName, Object data) {
|
||||
try {
|
||||
Mono<String> result = this.daprClient.invokeActorMethod(
|
||||
actorType,
|
||||
actorId.toString(),
|
||||
methodName,
|
||||
this.wrap(data));
|
||||
return result
|
||||
.filter(s -> (s != null) && (!s.isEmpty()))
|
||||
.map(s -> unwrap(s, String.class));
|
||||
} catch (IOException e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the response object from the Actor's method result.
|
||||
*
|
||||
* @param response String returned by API.
|
||||
* @param clazz Expected response class.
|
||||
* @param <T> Expected response type.
|
||||
* @return Response object, null or RuntimeException.
|
||||
*/
|
||||
private <T> T unwrap(final String response, Class<T> clazz) {
|
||||
if (response == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
ActorMethodEnvelope res = serializer.deserialize(response, ActorMethodEnvelope.class);
|
||||
if (res == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
byte[] data = res.getData();
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.serializer.deserialize(new String(data, StandardCharsets.UTF_8), clazz);
|
||||
} catch (IOException e) {
|
||||
// Wrap it to make Mono happy.
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the request to invoke an API for Actors.
|
||||
*
|
||||
* @param request Request object for the original Actor's method.
|
||||
* @param <T> Type for the original Actor's method request.
|
||||
* @return String to be sent to Dapr's API.
|
||||
* @throws IOException In case it cannot generate String.
|
||||
*/
|
||||
private <T> String wrap(final T request) throws IOException {
|
||||
if (request == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String json = this.serializer.serialize(request);
|
||||
ActorMethodEnvelope req = new ActorMethodEnvelope();
|
||||
req.setData(json == null ? EMPTY_DATA : json.getBytes());
|
||||
return serializer.serialize(req);
|
||||
}
|
||||
|
||||
}
|
|
@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonFactory;
|
|||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.actors.utils.ObjectSerializer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
|
@ -15,36 +16,19 @@ import java.io.Writer;
|
|||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* Serializes and deserializes an object.
|
||||
* Serializes and deserializes special objects for Actors.
|
||||
*/
|
||||
class ActorStateSerializer {
|
||||
public class ActorStateSerializer extends ObjectSerializer {
|
||||
|
||||
/**
|
||||
* Shared Json Factory as per Jackson's documentation, used only for this class.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
private static final JsonFactory JSON_FACTORY = new JsonFactory();
|
||||
|
||||
/**
|
||||
* Shared Json serializer/deserializer as per Jackson's documentation.
|
||||
*/
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
/**
|
||||
* Serializes a given state object into byte array.
|
||||
*
|
||||
* @param state State object to be serialized.
|
||||
* @return Array of bytes[] with the serialized content.
|
||||
* @throws IOException
|
||||
*/
|
||||
<T> String serialize(T state) throws IOException {
|
||||
@Override
|
||||
public <T> String serialize(T state) throws IOException {
|
||||
if (state == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (state.getClass() == String.class) {
|
||||
return state.toString();
|
||||
}
|
||||
|
||||
if (state.getClass() == ActorTimer.class) {
|
||||
// Special serializer for this internal classes.
|
||||
return serialize((ActorTimer<?>) state);
|
||||
|
@ -55,96 +39,22 @@ class ActorStateSerializer {
|
|||
return serialize((ActorReminderParams) state);
|
||||
}
|
||||
|
||||
if (isPrimitiveOrEquivalent(state.getClass())) {
|
||||
return state.toString();
|
||||
}
|
||||
|
||||
// Not string, not primitive, so it is a complex type: we use JSON for that.
|
||||
return OBJECT_MAPPER.writeValueAsString(state);
|
||||
// Is not an special case.
|
||||
return super.serialize(state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes the byte array into the original object.
|
||||
*
|
||||
* @param value String to be parsed.
|
||||
* @param clazz Type of the object being deserialized.
|
||||
* @param <T> Generic type of the object being deserialized.
|
||||
* @return Object of type T.
|
||||
* @throws IOException
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
<T> T deserialize(String value, Class<T> clazz) throws IOException {
|
||||
if (clazz == String.class) {
|
||||
return (T) value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T deserialize(String value, Class<T> clazz) throws IOException {
|
||||
if (clazz == ActorReminderParams.class) {
|
||||
// Special serializer for this internal classes.
|
||||
return (T) deserializeActorReminder(value);
|
||||
}
|
||||
|
||||
if (isPrimitiveOrEquivalent(clazz)) {
|
||||
return parse(value, clazz);
|
||||
}
|
||||
|
||||
if (value == null) {
|
||||
return (T) null;
|
||||
}
|
||||
|
||||
// Not string, not primitive, so it is a complex type: we use JSON for that.
|
||||
return OBJECT_MAPPER.readValue(value, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the class is a primitive or equivalent.
|
||||
* @param clazz Class to be checked.
|
||||
* @return True if primitive or equivalent.
|
||||
*/
|
||||
private static boolean isPrimitiveOrEquivalent(Class<?> clazz) {
|
||||
if (clazz == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (clazz.isPrimitive() ||
|
||||
(clazz == Boolean.class) ||
|
||||
(clazz == Character.class) ||
|
||||
(clazz == Byte.class) ||
|
||||
(clazz == Short.class) ||
|
||||
(clazz == Integer.class) ||
|
||||
(clazz == Long.class) ||
|
||||
(clazz == Float.class) ||
|
||||
(clazz == Double.class) ||
|
||||
(clazz == Void.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a given String to the corresponding object defined by class.
|
||||
* @param value String to be parsed.
|
||||
* @param clazz Class of the expected result type.
|
||||
* @param <T> Result type.
|
||||
* @return Result as corresponding type.
|
||||
*/
|
||||
private static <T> T parse(String value, Class<T> clazz) {
|
||||
if (value == null) {
|
||||
if (boolean.class == clazz) return (T) Boolean.FALSE;
|
||||
if (byte.class == clazz) return (T) Byte.valueOf((byte) 0);
|
||||
if (short.class == clazz) return (T) Short.valueOf((short) 0);
|
||||
if (int.class == clazz) return (T) Integer.valueOf(0);
|
||||
if (long.class == clazz) return (T) Long.valueOf(0L);
|
||||
if (float.class == clazz) return (T) Float.valueOf(0);
|
||||
if (double.class == clazz) return (T) Double.valueOf(0);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
if ((Boolean.class == clazz) || (boolean.class == clazz)) return (T) Boolean.valueOf(value);
|
||||
if ((Byte.class == clazz) || (byte.class == clazz)) return (T) Byte.valueOf(value);
|
||||
if ((Short.class == clazz) || (short.class == clazz)) return (T) Short.valueOf(value);
|
||||
if ((Integer.class == clazz) || (int.class == clazz)) return (T) Integer.valueOf(value);
|
||||
if ((Long.class == clazz) || (long.class == clazz)) return (T) Long.valueOf(value);
|
||||
if ((Float.class == clazz) || (float.class == clazz)) return (T) Float.valueOf(value);
|
||||
if ((Double.class == clazz) || (double.class == clazz)) return (T) Double.valueOf(value);
|
||||
|
||||
return null;
|
||||
// Is not one the special cases.
|
||||
return super.deserialize(value, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -12,11 +12,6 @@ import okhttp3.OkHttpClient;
|
|||
*/
|
||||
class AppToDaprClientBuilder extends AbstractClientBuilder {
|
||||
|
||||
/**
|
||||
* Default port for Dapr after checking environment variable.
|
||||
*/
|
||||
private int port = AppToDaprClientBuilder.GetEnvPortOrDefault();
|
||||
|
||||
/**
|
||||
* Builds an async client.
|
||||
*
|
||||
|
@ -25,6 +20,6 @@ class AppToDaprClientBuilder extends AbstractClientBuilder {
|
|||
public AppToDaprAsyncClient buildAsyncClient() {
|
||||
OkHttpClient.Builder builder = new OkHttpClient.Builder();
|
||||
// TODO: Expose configurations for OkHttpClient or com.microsoft.rest.RestClient.
|
||||
return new AppToDaprHttpAsyncClient(this.port, builder.build());
|
||||
return new AppToDaprHttpAsyncClient(super.getPort(), builder.build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
package io.dapr.actors.utils;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Serializes and deserializes an object.
|
||||
*/
|
||||
public class ObjectSerializer {
|
||||
|
||||
/**
|
||||
* Shared Json Factory as per Jackson's documentation, used only for this class.
|
||||
*/
|
||||
protected static final JsonFactory JSON_FACTORY = new JsonFactory();
|
||||
|
||||
/**
|
||||
* Shared Json serializer/deserializer as per Jackson's documentation.
|
||||
*/
|
||||
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
/**
|
||||
* Serializes a given state object into byte array.
|
||||
*
|
||||
* @param state State object to be serialized.
|
||||
* @return Array of bytes[] with the serialized content.
|
||||
* @throws IOException
|
||||
*/
|
||||
public <T> String serialize(T state) throws IOException {
|
||||
if (state == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (state.getClass() == String.class) {
|
||||
return state.toString();
|
||||
}
|
||||
|
||||
if (isPrimitiveOrEquivalent(state.getClass())) {
|
||||
return state.toString();
|
||||
}
|
||||
|
||||
// Not string, not primitive, so it is a complex type: we use JSON for that.
|
||||
return OBJECT_MAPPER.writeValueAsString(state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes the byte array into the original object.
|
||||
*
|
||||
* @param value String to be parsed.
|
||||
* @param clazz Type of the object being deserialized.
|
||||
* @param <T> Generic type of the object being deserialized.
|
||||
* @return Object of type T.
|
||||
* @throws IOException
|
||||
*/
|
||||
public <T> T deserialize(String value, Class<T> clazz) throws IOException {
|
||||
if (clazz == String.class) {
|
||||
return (T) value;
|
||||
}
|
||||
|
||||
if (isPrimitiveOrEquivalent(clazz)) {
|
||||
return parse(value, clazz);
|
||||
}
|
||||
|
||||
if (value == null) {
|
||||
return (T) null;
|
||||
}
|
||||
|
||||
// Not string, not primitive, so it is a complex type: we use JSON for that.
|
||||
return OBJECT_MAPPER.readValue(value, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the class is a primitive or equivalent.
|
||||
* @param clazz Class to be checked.
|
||||
* @return True if primitive or equivalent.
|
||||
*/
|
||||
private static boolean isPrimitiveOrEquivalent(Class<?> clazz) {
|
||||
if (clazz == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (clazz.isPrimitive() ||
|
||||
(clazz == Boolean.class) ||
|
||||
(clazz == Character.class) ||
|
||||
(clazz == Byte.class) ||
|
||||
(clazz == Short.class) ||
|
||||
(clazz == Integer.class) ||
|
||||
(clazz == Long.class) ||
|
||||
(clazz == Float.class) ||
|
||||
(clazz == Double.class) ||
|
||||
(clazz == Void.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a given String to the corresponding object defined by class.
|
||||
* @param value String to be parsed.
|
||||
* @param clazz Class of the expected result type.
|
||||
* @param <T> Result type.
|
||||
* @return Result as corresponding type.
|
||||
*/
|
||||
private static <T> T parse(String value, Class<T> clazz) {
|
||||
if (value == null) {
|
||||
if (boolean.class == clazz) return (T) Boolean.FALSE;
|
||||
if (byte.class == clazz) return (T) Byte.valueOf((byte) 0);
|
||||
if (short.class == clazz) return (T) Short.valueOf((short) 0);
|
||||
if (int.class == clazz) return (T) Integer.valueOf(0);
|
||||
if (long.class == clazz) return (T) Long.valueOf(0L);
|
||||
if (float.class == clazz) return (T) Float.valueOf(0);
|
||||
if (double.class == clazz) return (T) Double.valueOf(0);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
if ((Boolean.class == clazz) || (boolean.class == clazz)) return (T) Boolean.valueOf(value);
|
||||
if ((Byte.class == clazz) || (byte.class == clazz)) return (T) Byte.valueOf(value);
|
||||
if ((Short.class == clazz) || (short.class == clazz)) return (T) Short.valueOf(value);
|
||||
if ((Integer.class == clazz) || (int.class == clazz)) return (T) Integer.valueOf(value);
|
||||
if ((Long.class == clazz) || (long.class == clazz)) return (T) Long.valueOf(value);
|
||||
if ((Float.class == clazz) || (float.class == clazz)) return (T) Float.valueOf(value);
|
||||
if ((Double.class == clazz) || (double.class == clazz)) return (T) Double.valueOf(value);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package io.dapr.actors.client;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.runtime.ActorStateSerializer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ActorProxyImplTest {
|
||||
|
||||
@Test()
|
||||
public void constructorActorProxyTest() {
|
||||
final ActorProxyHttpAsyncClient actorProxyAsyncClient = (ActorProxyHttpAsyncClient)new ActorProxyClientBuilder().buildAsyncClient();
|
||||
final ActorProxyImpl actorProxy= new ActorProxyImpl(
|
||||
"myActorType",
|
||||
new ActorId("100"),
|
||||
new ActorStateSerializer(),
|
||||
actorProxyAsyncClient);
|
||||
Assert.assertEquals(actorProxy.getActorId().toString(),"100");
|
||||
Assert.assertEquals(actorProxy.getActorType(),"myActorType");
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ package io.dapr.actors.runtime;
|
|||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public class ActorReminderParamsTest {
|
||||
|
|
|
@ -15,9 +15,7 @@ import reactor.core.publisher.Mono;
|
|||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
|
|
|
@ -9,7 +9,7 @@ import io.dapr.actors.ActorId;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Testing the default constructor of an Actor.
|
||||
|
|
Loading…
Reference in New Issue