Actor over GRPC. (#395)

This commit is contained in:
Artur Souza 2020-11-24 15:23:35 -08:00 committed by GitHub
parent 183a9fef92
commit cf1c10d85a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1752 additions and 489 deletions

View File

@ -22,10 +22,11 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: 13.0.x
DAPR_CLI_VER: 1.0.0-rc.2
DAPR_RUNTIME_VER: 1.0.0-rc.1
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh
DAPR_CLI_REF: v1.0.0-rc.1
DAPR_REF:
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/a60221e96406a145ab22e454eec6642961725f5c/install/install.sh
DAPR_CLI_REF:
DAPR_REF: 83f5c45362b0c577139b1887276d7cf1b7308506
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}
@ -37,7 +38,7 @@ jobs:
with:
java-version: ${{ env.JDK_VER }}
- name: Set up Dapr CLI
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}
- name: Set up Go ${{ env.GOVER }}
if: env.DAPR_REF != '' || env.DAPR_CLI_REF != ''
uses: actions/setup-go@v2

View File

@ -29,24 +29,24 @@ public class DemoActorClient {
* @throws InterruptedException If program has been interrupted.
*/
public static void main(String[] args) throws InterruptedException {
ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder(DemoActor.class);
try (ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder(DemoActor.class)) {
List<Thread> threads = new ArrayList<>(NUM_ACTORS);
List<Thread> threads = new ArrayList<>(NUM_ACTORS);
// Creates multiple actors.
for (int i = 0; i < NUM_ACTORS; i++) {
ActorId actorId = ActorId.createRandom();
DemoActor actor = builder.build(actorId);
// Creates multiple actors.
for (int i = 0; i < NUM_ACTORS; i++) {
ActorId actorId = ActorId.createRandom();
DemoActor actor = builder.build(actorId);
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor));
thread.start();
threads.add(thread);
}
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor));
thread.start();
threads.add(thread);
}
// Waits for threads to finish.
for (Thread thread : threads) {
thread.join();
// Waits for threads to finish.
for (Thread thread : threads) {
thread.join();
}
}
System.out.println("Done.");

View File

@ -143,17 +143,18 @@ public class DemoActorClient {
private static final int NUM_ACTORS = 3;
public static void main(String[] args) throws InterruptedException {
ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder(DemoActor.class);
///...
for (int i = 0; i < NUM_ACTORS; i++) {
DemoActor actor = builder.build(ActorId.createRandom());
try (ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder(DemoActor.class)) {
///...
for (int i = 0; i < NUM_ACTORS; i++) {
DemoActor actor = builder.build(ActorId.createRandom());
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor));
thread.start();
threads.add(thread);
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor));
thread.start();
threads.add(thread);
}
///...
}
///...
}
private static final void callActorForever(String actorId, DemoActor actor) {
@ -182,7 +183,9 @@ public class DemoActorClient {
}
```
First, the client defines how many actors it is going to create. Then the main method declares a `ActorProxyBuilder` to create instances of the `DemoActor` interface, which are implemented automatically by the SDK and make remote calls to the equivalent methods in Actor runtime. The code executes the `callActorForever` private method once per actor. Initially, it will invoke `registerReminder()`, which sets the due time and period for the reminder. Then, `incrementAndGet()` increments a counter, persists it and sends it back as response. Finally `say` method which will print a message containing the received string along with the formatted server time.
First, the client defines how many actors it is going to create. The main method declares a `ActorProxyBuilder` to create instances of the `DemoActor` interface, which are implemented automatically by the SDK and make remote calls to the equivalent methods in Actor runtime. `ActorProxyBuilder` implements `Closeable`, which means it holds resources that need to be closed. In this example, we use the "try-resource" feature in Java.
Then, the code executes the `callActorForever` private method once per actor. Initially, it will invoke `registerReminder()`, which sets the due time and period for the reminder. Then, `incrementAndGet()` increments a counter, persists it and sends it back as response. Finally `say` method which will print a message containing the received string along with the formatted server time.
Use the follow command to execute the DemoActorClient:

View File

@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.33.1</grpc.version>
<protobuf.version>3.13.0</protobuf.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/f3ddd345962d7e065e26a54f7b2f1f2db9af1fdb/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/83f5c45362b0c577139b1887276d7cf1b7308506/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -8,20 +8,25 @@ package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorUtils;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.v1.DaprGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.Closeable;
import java.lang.reflect.Proxy;
/**
* Builder to generate an ActorProxy instance. Builder can be reused for multiple instances.
*/
public class ActorProxyBuilder<T> {
public class ActorProxyBuilder<T> implements Closeable {
/**
* Builder for Dapr's raw http client.
* Determine if this builder will create GRPC clients instead of HTTP clients.
*/
private final DaprHttpBuilder daprHttpBuilder = new DaprHttpBuilder();
private final boolean useGrpc;
/**
* Actor's type.
@ -38,6 +43,16 @@ public class ActorProxyBuilder<T> {
*/
private DaprObjectSerializer objectSerializer;
/**
* Builds Dapr HTTP client.
*/
private DaprHttpBuilder daprHttpBuilder;
/**
* Channel for communication with Dapr.
*/
private final ManagedChannel channel;
/**
* Instantiates a new builder for a given Actor type, using {@link DefaultObjectSerializer} by default.
*
@ -65,9 +80,12 @@ public class ActorProxyBuilder<T> {
throw new IllegalArgumentException("ActorTypeClass is required.");
}
this.useGrpc = Properties.USE_GRPC.get();
this.actorType = actorType;
this.objectSerializer = new DefaultObjectSerializer();
this.clazz = actorTypeClass;
this.daprHttpBuilder = new DaprHttpBuilder();
this.channel = buildManagedChannel();
}
/**
@ -100,7 +118,7 @@ public class ActorProxyBuilder<T> {
this.actorType,
actorId,
this.objectSerializer,
new DaprHttpClient(this.daprHttpBuilder.build()));
buildDaprClient());
if (this.clazz.equals(ActorProxy.class)) {
// If users want to use the not strongly typed API, we respect that here.
@ -113,4 +131,45 @@ public class ActorProxyBuilder<T> {
proxy);
}
/**
* Build an instance of the Client based on the provided setup.
*
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private DaprClient buildDaprClient() {
if (this.useGrpc) {
return new DaprGrpcClient(DaprGrpc.newFutureStub(this.channel));
}
return new DaprHttpClient(daprHttpBuilder.build());
}
/**
* {@inheritDoc}
*/
@Override
public void close() {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
}
/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
if (!Properties.USE_GRPC.get()) {
return null;
}
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}
return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port).usePlaintext().build();
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.client;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import reactor.core.publisher.Mono;
/**
* A DaprClient over GRPC for Actor.
*/
class DaprGrpcClient implements DaprClient {
/**
* The GRPC client to be used.
*
* @see DaprGrpc.DaprFutureStub
*/
private DaprGrpc.DaprFutureStub client;
/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
*/
DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) {
this.client = grpcClient;
}
/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> invokeActorMethod(String actorType, String actorId, String methodName, byte[] jsonPayload) {
return Mono.fromCallable(() -> {
DaprProtos.InvokeActorRequest req =
DaprProtos.InvokeActorRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setMethod(methodName)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
ListenableFuture<DaprProtos.InvokeActorResponse> futureResponse = client.invokeActor(req);
return futureResponse.get();
}).map(r -> r.getData().toByteArray());
}
}

View File

@ -51,5 +51,4 @@ class DaprHttpClient implements DaprClient {
this.client.invokeApi(DaprHttp.HttpMethods.POST.name(), url, null, jsonPayload, null, null);
return responseMono.map(r -> r.getBody());
}
}

View File

@ -78,6 +78,15 @@ public abstract class AbstractActor {
return this.id;
}
/**
* Returns the actor's type.
*
* @return Actor type.
*/
String getType() {
return this.actorRuntimeContext.getActorTypeInformation().getName();
}
/**
* Returns the state store manager for this Actor.
*
@ -105,12 +114,11 @@ public abstract class AbstractActor {
try {
byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state);
ActorReminderParams params = new ActorReminderParams(data, dueTime, period);
byte[] serialized = INTERNAL_SERIALIZER.serialize(params);
return this.actorRuntimeContext.getDaprClient().registerActorReminder(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName,
serialized);
params);
} catch (IOException e) {
return Mono.error(e);
}
@ -136,7 +144,7 @@ public abstract class AbstractActor {
T state,
Duration dueTime,
Duration period) {
return Mono.fromSupplier(() -> {
try {
if ((callback == null) || callback.isEmpty()) {
throw new IllegalArgumentException("Timer requires a callback function.");
}
@ -146,18 +154,17 @@ public abstract class AbstractActor {
name = String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID().toString());
}
return new ActorTimer(this, name, callback, state, dueTime, period);
}).flatMap(actorTimer -> {
try {
return this.actorRuntimeContext.getDaprClient().registerActorTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
actorTimer.getName(),
INTERNAL_SERIALIZER.serialize(actorTimer)).then(Mono.just(actorTimer.getName()));
} catch (Exception e) {
return Mono.error(e);
}
});
byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state);
ActorTimerParams actorTimer = new ActorTimerParams(callback, data, dueTime, period);
return this.actorRuntimeContext.getDaprClient().registerActorTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
name,
actorTimer).thenReturn(name);
} catch (Exception e) {
return Mono.error(e);
}
}
/**

View File

@ -34,9 +34,9 @@ public class ActorObjectSerializer extends ObjectSerializer {
return null;
}
if (state.getClass() == ActorTimer.class) {
if (state.getClass() == ActorTimerParams.class) {
// Special serializer for this internal classes.
return serialize((ActorTimer) state);
return serialize((ActorTimerParams) state);
}
if (state.getClass() == ActorReminderParams.class) {
@ -61,7 +61,7 @@ public class ActorObjectSerializer extends ObjectSerializer {
* @return JSON String.
* @throws IOException If cannot generate JSON.
*/
private byte[] serialize(ActorTimer<?> timer) throws IOException {
private byte[] serialize(ActorTimerParams timer) throws IOException {
if (timer == null) {
return null;
}
@ -72,8 +72,8 @@ public class ActorObjectSerializer extends ObjectSerializer {
generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(timer.getDueTime()));
generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(timer.getPeriod()));
generator.writeStringField("callback", timer.getCallback());
if (timer.getState() != null) {
generator.writeBinaryField("data", this.serialize(timer.getState()));
if (timer.getData() != null) {
generator.writeBinaryField("data", timer.getData());
}
generator.writeEndObject();
generator.close();
@ -148,6 +148,11 @@ public class ActorObjectSerializer extends ObjectSerializer {
*/
@Override
public <T> T deserialize(byte[] content, Class<T> clazz) throws IOException {
if (clazz == ActorTimerParams.class) {
// Special serializer for this internal classes.
return (T) deserializeActorTimer(content);
}
if (clazz == ActorReminderParams.class) {
// Special serializer for this internal classes.
return (T) deserializeActorReminder(content);
@ -158,49 +163,24 @@ public class ActorObjectSerializer extends ObjectSerializer {
}
/**
* Extracts the response data from a JSON Payload where data is in "data" attribute.
* Deserializes an Actor Timer.
*
* @param payload JSON payload containing "data".
* @return byte[] instance, null.
* @throws IOException In case it cannot generate String.
* @param value Content to be deserialized.
* @return Actor Timer.
* @throws IOException If cannot parse JSON.
*/
public byte[] unwrapData(final byte[] payload) throws IOException {
if (payload == null) {
private ActorTimerParams deserializeActorTimer(byte[] value) throws IOException {
if (value == null) {
return null;
}
JsonNode root = OBJECT_MAPPER.readTree(payload);
if (root == null) {
return null;
}
JsonNode node = OBJECT_MAPPER.readTree(value);
String callback = node.get("callback").asText();
Duration dueTime = extractDurationOrNull(node, "dueTime");
Duration period = extractDurationOrNull(node, "period");
byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null;
JsonNode dataNode = root.get("data");
if (dataNode == null) {
return null;
}
return dataNode.binaryValue();
}
/**
* Wraps data in the "data" attribute in a JSON object.
*
* @param data bytes to be wrapped into the "data" attribute in a JSON object.
* @return String to be sent to Dapr's API.
* @throws IOException If there's is any issue reading the data or wraping it
*/
public byte[] wrapData(final byte[] data) throws IOException {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(output);
generator.writeStartObject();
if (data != null) {
generator.writeBinaryField("data", data);
}
generator.writeEndObject();
generator.close();
output.flush();
return output.toByteArray();
}
return new ActorTimerParams(callback, data, dueTime, period);
}
/**
@ -216,11 +196,26 @@ public class ActorObjectSerializer extends ObjectSerializer {
}
JsonNode node = OBJECT_MAPPER.readTree(value);
Duration dueTime = DurationUtils.convertDurationFromDaprFormat(node.get("dueTime").asText());
Duration period = DurationUtils.convertDurationFromDaprFormat(node.get("period").asText());
Duration dueTime = extractDurationOrNull(node, "dueTime");
Duration period = extractDurationOrNull(node, "period");
byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null;
return new ActorReminderParams(data, dueTime, period);
}
/**
* Extracts duration or null.
*
* @param node Node that contains the attribute.
* @param name Attribute name.
* @return Parsed duration or null.
*/
private static Duration extractDurationOrNull(JsonNode node, String name) {
JsonNode valueNode = node.get(name);
if (valueNode == null) {
return null;
}
return DurationUtils.convertDurationFromDaprFormat(valueNode.asText());
}
}

View File

@ -8,10 +8,14 @@ package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorTrace;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@ -21,7 +25,7 @@ import java.util.Map;
* Contains methods to register actor types. Registering the types allows the
* runtime to create instances of the actor.
*/
public class ActorRuntime {
public class ActorRuntime implements Closeable {
/**
* Serializer for internal Dapr objects.
@ -43,6 +47,11 @@ public class ActorRuntime {
*/
private static volatile ActorRuntime instance;
/**
* Channel for communication with Dapr.
*/
private final ManagedChannel channel;
/**
* Configuration for the Actor runtime.
*/
@ -64,16 +73,27 @@ public class ActorRuntime {
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime() throws IllegalStateException {
this(new DaprHttpClient(new DaprHttpBuilder().build()));
this(buildManagedChannel());
}
/**
* Constructor once channel is available. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime(ManagedChannel channel) throws IllegalStateException {
this(channel, buildDaprClient(channel));
}
/**
* Constructor with dependency injection, useful for testing. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @param daprClient Client to communicate with Dapr.
* @throws IllegalStateException If class has one instance already.
*/
private ActorRuntime(DaprClient daprClient) throws IllegalStateException {
private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws IllegalStateException {
if (instance != null) {
throw new IllegalStateException("ActorRuntime should only be constructed once");
}
@ -81,6 +101,7 @@ public class ActorRuntime {
this.config = new ActorRuntimeConfig();
this.actorManagers = Collections.synchronizedMap(new HashMap<>());
this.daprClient = daprClient;
this.channel = channel;
}
/**
@ -272,9 +293,52 @@ public class ActorRuntime {
if (actorManager == null) {
String errorMsg = String.format("Actor type %s is not registered with Actor runtime.", actorTypeName);
ACTOR_TRACE.writeError(TRACE_TYPE, actorTypeName, "Actor type is not registered with runtime.");
throw new IllegalStateException(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
return actorManager;
}
/**
* Build an instance of the Client based on the provided setup.
*
* @param channel GRPC managed channel (or null, if not using GRPC).
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(ManagedChannel channel) {
if (Properties.USE_GRPC.get()) {
return new DaprGrpcClient(channel);
}
return new DaprHttpClient(new DaprHttpBuilder().build());
}
/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
if (!Properties.USE_GRPC.get()) {
return null;
}
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}
return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port).usePlaintext().build();
}
/**
* {@inheritDoc}
*/
@Override
public void close() {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
/**
* Represents a state operation for actor.
*
*/
final class ActorStateOperation {
/**
* Name of the operation.
*/
private String operationType;
/**
* Key for the state to be persisted.
*/
private String key;
/**
* Value of the state to be persisted.
*/
private Object value;
/**
* Instantiates a new Actor Timer.
*
* @param operationType Type of state operation.
* @param key Key to be persisted.
* @param value Value to be persisted.
*/
ActorStateOperation(String operationType,
String key,
Object value) {
this.operationType = operationType;
this.key = key;
this.value = value;
}
/**
* Gets the type of state operation.
*
* @return State operation.
*/
public String getOperationType() {
return operationType;
}
/**
* Gets the key to be persisted.
*
* @return Key to be persisted.
*/
public String getKey() {
return key;
}
/**
* Gets the value to be persisted.
*
* @return Value to be persisted.
*/
public Object getValue() {
return value;
}
}

View File

@ -1,116 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import java.time.Duration;
/**
* Represents the timer set on an Actor, to be called once after due time and then every period.
*
* @param <T> State type.
*/
final class ActorTimer<T> {
/**
* Actor that owns this timer.
*/
private final AbstractActor owner;
/**
* Name of this timer.
*/
private String name;
/**
* Name of the method to be called for this timer.
*/
private String callback;
/**
* State to be sent in the timer.
*/
private T state;
/**
* Due time for the timer's first trigger.
*/
private Duration dueTime;
/**
* Period at which the timer will be triggered.
*/
private Duration period;
/**
* Instantiates a new Actor Timer.
*
* @param owner The Actor that owns this timer. The timer callback will be fired for this Actor.
* @param timerName The name of the timer.
* @param callback The name of the method to be called for this timer.
* @param state information to be used by the callback method
* @param dueTime the time when timer is first due.
* @param period the periodic time when timer will be invoked.
*/
ActorTimer(AbstractActor owner,
String timerName,
String callback,
T state,
Duration dueTime,
Duration period) {
this.owner = owner;
this.name = timerName;
this.callback = callback;
this.state = state;
this.dueTime = dueTime;
this.period = period;
}
/**
* Gets the name of the Timer. The name is unique per actor.
*
* @return The name of the timer.
*/
public String getName() {
return this.name;
}
/**
* Gets the name of the method for this Timer.
*
* @return The name of the method for this timer.
*/
public String getCallback() {
return this.callback;
}
/**
* Gets the time when timer is first due.
*
* @return Time as Duration when timer is first due.
*/
public Duration getDueTime() {
return this.dueTime;
}
/**
* Gets the periodic time when timer will be invoked.
*
* @return Periodic time as Duration when timer will be invoked.
*/
public Duration getPeriod() {
return this.period;
}
/**
* Gets state containing information to be used by the callback method, or null.
*
* @return State containing information to be used by the callback method, or null.
*/
public T getState() {
return this.state;
}
}

View File

@ -5,55 +5,86 @@
package io.dapr.actors.runtime;
import java.time.Duration;
/**
* Parameters for Actor Timer.
* Represents the timer set on an Actor, to be called once after due time and then every period.
*
*/
final class ActorTimerParams {
/**
* Callback function to be invoked in actor.
* Name of the method to be called for this timer.
*/
private String callback;
/**
* Data to be passed in as part of the timer trigger.
* State to be sent in the timer.
*/
private byte[] data;
/**
* Sets the name of the callback function.
*
* @param callback Name of the callback function.
* Due time for the timer's first trigger.
*/
public void setCallback(String callback) {
private Duration dueTime;
/**
* Period at which the timer will be triggered.
*/
private Duration period;
/**
* Instantiates a new Actor Timer.
*
* @param callback The name of the method to be called for this timer.
* @param data The state to be used by the callback method
* @param dueTime The time when timer is first due.
* @param period The periodic time when timer will be invoked.
*/
ActorTimerParams(String callback,
byte[] data,
Duration dueTime,
Duration period) {
this.callback = callback;
this.data = data;
this.dueTime = dueTime;
this.period = period;
}
/**
* Gets the name of the callback function.
* Gets the name of the method for this Timer.
*
* @return Name of the callback function.
* @return The name of the method for this timer.
*/
public String getCallback() {
return this.callback;
}
/**
* Sets the raw data for the callback function.
* Gets the time when timer is first due.
*
* @param data Raw data for the callback function.
* @return Time as Duration when timer is first due.
*/
public void setData(byte[] data) {
this.data = data;
public Duration getDueTime() {
return this.dueTime;
}
/**
* Gets the raw data for the callback function.
* Gets the periodic time when timer will be invoked.
*
* @return Raw data for the callback function.
* @return Periodic time as Duration when timer will be invoked.
*/
public Duration getPeriod() {
return this.period;
}
/**
* Gets state containing information to be used by the callback method, or null.
*
* @return State containing information to be used by the callback method, or null.
*/
public byte[] getData() {
return data;
return this.data;
}
}

View File

@ -7,6 +7,8 @@ package io.dapr.actors.runtime;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
*/
@ -25,23 +27,27 @@ interface DaprClient {
/**
* Saves state batch to Dapr.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param data State to be saved.
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param operations State transaction operations.
* @return Asynchronous void result.
*/
Mono<Void> saveActorStateTransactionally(String actorType, String actorId, byte[] data);
Mono<Void> saveActorStateTransactionally(String actorType, String actorId, List<ActorStateOperation> operations);
/**
* Register a reminder.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param reminderName Name of reminder to be registered.
* @param data JSON reminder data as per Dapr's spec.
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param reminderName Name of reminder to be registered.
* @param reminderParams Parameters for the reminder.
* @return Asynchronous void result.
*/
Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, byte[] data);
Mono<Void> registerActorReminder(
String actorType,
String actorId,
String reminderName,
ActorReminderParams reminderParams);
/**
* Unregisters a reminder.
@ -54,15 +60,15 @@ interface DaprClient {
Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName);
/**
* Registers a timer.
* Register a timer.
*
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param timerName Name of timer to be registered.
* @param data JSON reminder data as per Dapr's spec.
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param timerName Name of reminder to be registered.
* @param timerParams Parameters for the timer.
* @return Asynchronous void result.
*/
Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, byte[] data);
Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, ActorTimerParams timerParams);
/**
* Unregisters a timer.

View File

@ -0,0 +1,224 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.config.Properties;
import io.dapr.utils.DurationUtils;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.ManagedChannel;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* A DaprClient over HTTP for Actor's runtime.
*/
class DaprGrpcClient implements DaprClient {
/**
* Use to handle internal serialization.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Dapr's charset.
*/
private static final Charset CHARSET = Properties.STRING_CHARSET.get();
/**
* The GRPC client to be used.
*
* @see io.dapr.v1.DaprGrpc.DaprFutureStub
*/
private DaprGrpc.DaprFutureStub client;
/**
* Internal constructor.
*
* @param channel channel (client needs to close channel after use).
*/
DaprGrpcClient(ManagedChannel channel) {
this(DaprGrpc.newFutureStub(channel));
}
/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
*/
DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) {
this.client = grpcClient;
}
/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> getActorState(String actorType, String actorId, String keyName) {
return Mono.fromCallable(() -> {
DaprProtos.GetActorStateRequest req =
DaprProtos.GetActorStateRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setKey(keyName)
.build();
ListenableFuture<DaprProtos.GetActorStateResponse> futureResponse = client.getActorState(req);
return futureResponse.get();
}).map(r -> r.getData().toByteArray());
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> saveActorStateTransactionally(
String actorType,
String actorId,
List<ActorStateOperation> operations) {
List<DaprProtos.TransactionalActorStateOperation> grpcOps = new ArrayList<>();
for (ActorStateOperation op : operations) {
String operationType = op.getOperationType();
String key = op.getKey();
Object value = op.getValue();
DaprProtos.TransactionalActorStateOperation.Builder opBuilder =
DaprProtos.TransactionalActorStateOperation.newBuilder()
.setOperationType(operationType)
.setKey(key);
if (value != null) {
if (value instanceof String) {
opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom((String) value, CHARSET)));
} else if (value instanceof byte[]) {
try {
String base64 = OBJECT_MAPPER.writeValueAsString(value);
opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET)));
} catch (IOException e) {
return Mono.error(e);
}
} else {
return Mono.error(() -> {
throw new IllegalArgumentException("Actor state value must be String or byte[]");
});
}
}
grpcOps.add(opBuilder.build());
}
DaprProtos.ExecuteActorStateTransactionRequest req =
DaprProtos.ExecuteActorStateTransactionRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.addAllOperations(grpcOps)
.build();
return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureResponse = client.executeActorStateTransaction(req);
return futureResponse.get();
}).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerActorReminder(
String actorType,
String actorId,
String reminderName,
ActorReminderParams reminderParams) {
return Mono.fromCallable(() -> {
DaprProtos.RegisterActorReminderRequest req =
DaprProtos.RegisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.setData(ByteString.copyFrom(reminderParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()))
.build();
ListenableFuture<Empty> futureResponse = client.registerActorReminder(req);
futureResponse.get();
return null;
});
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName) {
return Mono.fromCallable(() -> {
DaprProtos.UnregisterActorReminderRequest req =
DaprProtos.UnregisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.build();
ListenableFuture<Empty> futureResponse = client.unregisterActorReminder(req);
futureResponse.get();
return null;
});
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerActorTimer(
String actorType,
String actorId,
String timerName,
ActorTimerParams timerParams) {
return Mono.fromCallable(() -> {
DaprProtos.RegisterActorTimerRequest req =
DaprProtos.RegisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.setCallback(timerParams.getCallback())
.setData(ByteString.copyFrom(timerParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod()))
.build();
ListenableFuture<Empty> futureResponse = client.registerActorTimer(req);
futureResponse.get();
return null;
});
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName) {
return Mono.fromCallable(() -> {
DaprProtos.UnregisterActorTimerRequest req =
DaprProtos.UnregisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.build();
ListenableFuture<Empty> futureResponse = client.unregisterActorTimer(req);
futureResponse.get();
return null;
});
}
}

View File

@ -5,14 +5,30 @@
package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.client.DaprHttp;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
/**
* A DaprClient over HTTP for Actor's runtime.
*/
class DaprHttpClient implements DaprClient {
/**
* Internal serializer for state.
*/
private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer();
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* Base URL for Dapr Actor APIs.
*/
@ -75,18 +91,77 @@ class DaprHttpClient implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> saveActorStateTransactionally(String actorType, String actorId, byte[] data) {
public Mono<Void> saveActorStateTransactionally(
String actorType,
String actorId,
List<ActorStateOperation> operations) {
// Constructing the JSON via a stream API to avoid creating transient objects to be instantiated.
byte[] payload = null;
try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
// Start array
generator.writeStartArray();
for (ActorStateOperation stateOperation : operations) {
// Start operation object.
generator.writeStartObject();
generator.writeStringField("operation", stateOperation.getOperationType());
// Start request object.
generator.writeObjectFieldStart("request");
generator.writeStringField("key", stateOperation.getKey());
Object value = stateOperation.getValue();
if (value != null) {
if (value instanceof String) {
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
generator.writeFieldName("value");
generator.writeRawValue((String) value);
} else if (value instanceof byte[]) {
// Custom serializer uses byte[].
// DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too.
generator.writeBinaryField("value", (byte[]) value);
} else {
return Mono.error(() -> {
throw new IllegalArgumentException("Actor state value must be String or byte[]");
});
}
}
// End request object.
generator.writeEndObject();
// End operation object.
generator.writeEndObject();
}
// End array
generator.writeEndArray();
generator.close();
writer.flush();
payload = writer.toByteArray();
} catch (IOException e) {
return Mono.error(e);
}
String url = String.format(ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId);
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null).then();
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, payload, null, null).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, byte[] data) {
public Mono<Void> registerActorReminder(
String actorType,
String actorId,
String reminderName,
ActorReminderParams reminderParams) {
String url = String.format(ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null).then();
return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(reminderParams))
.flatMap(data ->
this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null)
).then();
}
/**
@ -102,9 +177,16 @@ class DaprHttpClient implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, byte[] data) {
String url = String.format(ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null).then();
public Mono<Void> registerActorTimer(
String actorType,
String actorId,
String timerName,
ActorTimerParams timerParams) {
return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(timerParams))
.flatMap(data -> {
String url = String.format(ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null);
}).then();
}
/**

View File

@ -5,8 +5,6 @@
package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import io.dapr.config.Properties;
@ -15,9 +13,9 @@ import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
/**
* State Provider to interact with Dapr runtime to handle state.
@ -34,11 +32,6 @@ class DaprStateAsyncProvider {
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* Dapr's client for Actor runtime.
*/
@ -71,8 +64,15 @@ class DaprStateAsyncProvider {
return result.flatMap(s -> {
try {
if (s == null) {
return Mono.empty();
}
T response = this.stateSerializer.deserialize(s, type);
if (this.isStateSerializerDefault && (response instanceof byte[])) {
if (s.length == 0) {
return Mono.empty();
}
// Default serializer just passes through byte arrays, so we need to decode it here.
response = (T) OBJECT_MAPPER.readValue(s, byte[].class);
}
@ -120,71 +120,42 @@ class DaprStateAsyncProvider {
return Mono.empty();
}
int count = 0;
// Constructing the JSON via a stream API to avoid creating transient objects to be instantiated.
byte[] payload = null;
try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
// Start array
generator.writeStartArray();
ArrayList<ActorStateOperation> operations = new ArrayList<>(stateChanges.length);
for (ActorStateChange stateChange : stateChanges) {
if ((stateChange == null) || (stateChange.getChangeKind() == null)) {
continue;
}
for (ActorStateChange stateChange : stateChanges) {
if ((stateChange == null) || (stateChange.getChangeKind() == null)) {
continue;
}
String operationName = stateChange.getChangeKind().getDaprStateChangeOperation();
if ((operationName == null) || (operationName.length() == 0)) {
continue;
}
String operationName = stateChange.getChangeKind().getDaprStateChangeOperation();
if ((operationName == null) || (operationName.length() == 0)) {
continue;
}
count++;
// Start operation object.
generator.writeStartObject();
generator.writeStringField("operation", operationName);
// Start request object.
generator.writeObjectFieldStart("request");
generator.writeStringField("key", stateChange.getStateName());
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE)
|| (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
String key = stateChange.getStateName();
Object value = null;
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE)
|| (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
try {
byte[] data = this.stateSerializer.serialize(stateChange.getValue());
if (data != null) {
if (this.isStateSerializerDefault && !(stateChange.getValue() instanceof byte[])) {
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
generator.writeFieldName("value");
generator.writeRawValue(new String(data, CHARSET));
value = new String(data, CHARSET);
} else {
// Custom serializer uses byte[].
// DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too.
generator.writeBinaryField("value", data);
value = data;
}
}
} catch (IOException e) {
return Mono.error(e);
}
// End request object.
generator.writeEndObject();
// End operation object.
generator.writeEndObject();
}
// End array
generator.writeEndArray();
generator.close();
writer.flush();
payload = writer.toByteArray();
} catch (IOException e) {
return Mono.error(e);
operations.add(new ActorStateOperation(operationName, key, value));
}
if (count == 0) {
// No-op since there is no operation to be performed.
Mono.empty();
}
return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), payload);
return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), operations);
}
}

View File

@ -4,11 +4,12 @@
*/
package io.dapr.actors;
import java.util.*;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* Unit tests for ActorId.
*/

View File

@ -6,8 +6,9 @@
package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.actors.ActorMethod;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -58,16 +59,16 @@ public class ActorProxyImplTest {
public void invokeActorMethodWithoutDataWithReturnTypeViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.just(
"{\n\t\t\"propertyA\": \"valueA\",\n\t\t\"propertyB\": \"valueB\"\n\t}".getBytes());
"{\n\t\t\"propertyA\": \"valueA\",\n\t\t\"propertyB\": \"valueB\"\n\t}".getBytes());
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(daprResponse);
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
MyData myData = (MyData) actorProxy.invoke(actorProxy, Actor.class.getMethod("getData"), null);
Assert.assertNotNull(myData);
@ -75,6 +76,177 @@ public class ActorProxyImplTest {
Assert.assertEquals("valueB", myData.getPropertyB());// propertyB=null
}
@Test()
public void invokeActorMethodWithoutDataWithReturnMonoTypeViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.just(
"{\n\t\t\"propertyA\": \"valueA\",\n\t\t\"propertyB\": \"valueB\"\n\t}".getBytes());
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Mono<MyData> res = (Mono<MyData>) actorProxy.invoke(actorProxy, Actor.class.getMethod("getDataMono"), null);
Assert.assertNotNull(res);
MyData myData = res.block();
Assert.assertNotNull(myData);
Assert.assertEquals("valueA", myData.getPropertyA());
Assert.assertEquals("valueB", myData.getPropertyB());// propertyB=null
}
@Test()
public void invokeActorMethodWithDataWithReturnTypeViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.just(
"\"OK\"".getBytes());
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes())))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
String res = (String) actorProxy.invoke(
actorProxy,
Actor.class.getMethod("echo", String.class),
new Object[] { "hello world" } );
Assert.assertEquals("OK", res);
}
@Test()
public void invokeActorMethodWithDataWithReturnMonoTypeViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.just(
"\"OK\"".getBytes());
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes())))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Mono<String> res = (Mono<String>) actorProxy.invoke(
actorProxy,
Actor.class.getMethod("echoMono", String.class),
new Object[] { "hello world" } );
Assert.assertNotNull(res);
Assert.assertEquals("OK", res.block());
}
@Test()
public void invokeActorMethodWithoutDataWithoutReturnTypeViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.empty();
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Object myData = actorProxy.invoke(actorProxy, Actor.class.getMethod("doSomething"), null);
Assert.assertNull(myData);
}
@Test()
public void invokeActorMethodWithoutDataWithoutReturnTypeMonoViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.empty();
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull()))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Mono<Void> myData = (Mono<Void>)actorProxy.invoke(actorProxy, Actor.class.getMethod("doSomethingMono"), null);
Assert.assertNotNull(myData);
Assert.assertNull(myData.block());
}
@Test()
public void invokeActorMethodWithDataWithoutReturnTypeMonoViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.empty();
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes())))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Mono<Void> myData = (Mono<Void>)actorProxy.invoke(
actorProxy,
Actor.class.getMethod("doSomethingMonoWithArg", String.class),
new Object[] { "hello world" });
Assert.assertNotNull(myData);
Assert.assertNull(myData.block());
}
@Test(expected = UnsupportedOperationException.class)
public void invokeActorMethodWithTooManyArgsViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Mono<Void> myData = (Mono<Void>)actorProxy.invoke(
actorProxy,
Actor.class.getMethod("tooManyArgs", String.class, String.class),
new Object[] { "hello", "world" });
Assert.assertNotNull(myData);
Assert.assertNull(myData.block());
}
@Test()
public void invokeActorMethodWithDataWithoutReturnTypeViaReflection() throws NoSuchMethodException {
final DaprClient daprClient = mock(DaprClient.class);
Mono<byte[]> daprResponse = Mono.empty();
when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes())))
.thenReturn(daprResponse);
final ActorProxyImpl actorProxy = new ActorProxyImpl(
"myActorType",
new ActorId("100"),
new DefaultObjectSerializer(),
daprClient);
Object res = actorProxy.invoke(
actorProxy,
Actor.class.getMethod("process", String.class),
new Object[] { "hello world" } );
Assert.assertNull(res);
}
@Test()
public void invokeActorMethodWithoutDataWithEmptyReturnType() {
final DaprClient daprClient = mock(DaprClient.class);
@ -110,8 +282,6 @@ public class ActorProxyImplTest {
Assert.fail("Not exception was throw"))
.doOnError(Throwable::printStackTrace
).block();
}
@Test()
@ -273,6 +443,24 @@ public class ActorProxyImplTest {
interface Actor {
MyData getData();
String echo(String message);
@ActorMethod(returns = MyData.class)
Mono<MyData> getDataMono();
@ActorMethod(returns = String.class)
Mono<String> echoMono(String message);
void doSomething();
Mono<Void> doSomethingMono();
void process(String something);
Mono<Void> doSomethingMonoWithArg(String something);
void tooManyArgs(String something, String something2);
}
static class MyData {

View File

@ -0,0 +1,114 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class DaprGrpcClientTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ACTOR_TYPE = "MyActorType";
private static final String ACTOR_ID = "1234567890";
private DaprGrpc.DaprFutureStub grpcStub;
private DaprGrpcClient client;
@Before
public void setup() {
grpcStub = mock(DaprGrpc.DaprFutureStub.class);
client = new DaprGrpcClient(grpcStub);
}
@Test
public void invoke() {
String methodName = "mymethod";
byte[] payload = "{ \"id\": 123 }".getBytes();
byte[] response = "\"OK\"".getBytes();
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(payload, argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
Mono<byte[]> result = client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, payload);
assertArrayEquals(response, result.block());
}
@Test
public void invokeNullPayload() {
String methodName = "mymethod";
byte[] response = "\"OK\"".getBytes();
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(new byte[0], argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
Mono<byte[]> result = client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, null);
assertArrayEquals(response, result.block());
}
@Test
public void invokeException() {
String methodName = "mymethod";
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(new byte[0], argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
Mono<byte[]> result = client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, null);
Exception exception = assertThrows(Exception.class, () -> result.block());
assertTrue(exception.getCause().getCause() instanceof ArithmeticException);
}
@Test
public void invokeNotHotMono() {
String methodName = "mymethod";
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(new byte[0], argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, null);
// No exception thrown because Mono is ignored here.
}
}

View File

@ -8,16 +8,16 @@ package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorType;
import io.dapr.serializer.DefaultObjectSerializer;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import io.dapr.utils.TypeRef;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -266,16 +266,14 @@ public class ActorManagerTest {
private byte[] createReminderParams(String data) throws IOException {
byte[] serializedData = this.context.getObjectSerializer().serialize(data);
ActorReminderParams params = new ActorReminderParams(serializedData, Duration.ofSeconds(1), Duration.ofSeconds(1));
return INTERNAL_SERIALIZER.serialize(params);
ActorReminderParams p = new ActorReminderParams(serializedData, Duration.ofSeconds(1), Duration.ofSeconds(1));
return INTERNAL_SERIALIZER.serialize(p);
}
private byte[] createTimerParams(String callback, Object data) throws IOException {
byte[] serializedData = this.context.getObjectSerializer().serialize(data);
ActorTimerParams params = new ActorTimerParams();
params.setCallback(callback);
params.setData(serializedData);
return INTERNAL_SERIALIZER.serialize(params);
ActorTimerParams p = new ActorTimerParams(callback, serializedData, Duration.ofSeconds(1), Duration.ofSeconds(1));
return INTERNAL_SERIALIZER.serialize(p);
}
private static ActorId newActorId() {

View File

@ -7,6 +7,7 @@ package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorType;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@ -14,6 +15,7 @@ import org.junit.Test;
import reactor.core.publisher.Mono;
import java.lang.reflect.Constructor;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
@ -92,7 +94,7 @@ public class ActorRuntimeTest {
@BeforeClass
public static void beforeAll() throws Exception {
constructor = (Constructor<ActorRuntime>) Arrays.stream(ActorRuntime.class.getDeclaredConstructors())
.filter(c -> c.getParameters().length == 1)
.filter(c -> c.getParameters().length == 2)
.map(c -> {
c.setAccessible(true);
return c;
@ -104,15 +106,56 @@ public class ActorRuntimeTest {
@Before
public void setup() throws Exception {
this.mockDaprClient = mock(DaprClient.class);
this.runtime = constructor.newInstance(this.mockDaprClient);
this.runtime = constructor.newInstance(null, this.mockDaprClient);
}
@Test(expected = IllegalArgumentException.class)
public void registerActorNullClass() {
this.runtime.registerActor(null);
}
@Test(expected = IllegalArgumentException.class)
public void registerActorNullFactory() {
this.runtime.registerActor(MyActorImpl.class, null, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}
@Test(expected = IllegalArgumentException.class)
public void registerActorNullSerializer() {
this.runtime.registerActor(MyActorImpl.class, new DefaultActorFactory<>(), null, new DefaultObjectSerializer());
}
@Test(expected = IllegalArgumentException.class)
public void registerActorNullStateSerializer() {
this.runtime.registerActor(MyActorImpl.class, new DefaultActorFactory<>(), new DefaultObjectSerializer(), null);
}
@Test
public void registerActor() throws Exception {
this.runtime.registerActor(MyActorImpl.class);
Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME));
public void setActorIdleTimeout() throws Exception {
this.runtime.getConfig().setActorIdleTimeout(Duration.ofSeconds(123));
Assert.assertEquals("{\"entities\":[],\"actorIdleTimeout\":\"0h2m3s0ms\"}",
new String(this.runtime.serializeConfig()));
}
@Test
public void setActorScanInterval() throws Exception {
this.runtime.getConfig().setActorScanInterval(Duration.ofSeconds(123));
Assert.assertEquals("{\"entities\":[],\"actorScanInterval\":\"0h2m3s0ms\"}",
new String(this.runtime.serializeConfig()));
}
@Test
public void setDrainBalancedActors() throws Exception {
this.runtime.getConfig().setDrainBalancedActors(true);
Assert.assertEquals("{\"entities\":[],\"drainBalancedActors\":true}",
new String(this.runtime.serializeConfig()));
}
@Test
public void setDrainOngoingCallTimeout() throws Exception {
this.runtime.getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(123));
Assert.assertEquals("{\"entities\":[],\"drainOngoingCallTimeout\":\"0h2m3s0ms\"}",
new String(this.runtime.serializeConfig()));
}
@Test
public void invokeActor() throws Exception {
@ -124,6 +167,14 @@ public class ActorRuntimeTest {
Assert.assertEquals("Nothing to say.", message);
}
@Test(expected = IllegalArgumentException.class)
public void invokeUnknownActor() {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.invoke("UnknownActor", actorId, "say", null).block();
}
@Test
public void deactivateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
@ -152,7 +203,7 @@ public class ActorRuntimeTest {
@Test
public void lazyInvoke() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class);
this.runtime.registerActor(MyActorImpl.class, new DefaultActorFactory<>());
Mono<byte[]> invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null);

View File

@ -26,9 +26,7 @@ public class ActorTimerTest {
.plusHours(1)
.plusSeconds(3);
ActorTimer timer = new ActorTimer(
null,
"testTimer",
ActorTimerParams timer = new ActorTimerParams(
"myfunction",
null,
dueTime,
@ -51,9 +49,7 @@ public class ActorTimerTest {
.minusHours(1)
.minusMinutes(3);
ActorTimer timer = new ActorTimer(
null,
"testTimer",
ActorTimerParams timer = new ActorTimerParams(
"myfunction",
null,
dueTime,

View File

@ -0,0 +1,333 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.utils.DurationUtils;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;
public class DaprGrpcClientTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ACTOR_TYPE = "MyActorType";
private static final String ACTOR_ID = "1234567890";
private DaprGrpc.DaprFutureStub grpcStub;
private DaprGrpcClient client;
@Before
public void setup() {
grpcStub = mock(DaprGrpc.DaprFutureStub.class);
client = new DaprGrpcClient(grpcStub);
}
@Test
public void getActorStateException() {
SettableFuture<DaprProtos.GetActorStateResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
"MyKey"
)))).thenReturn(settableFuture);
Mono<byte[]> result = client.getActorState(ACTOR_TYPE, ACTOR_ID, "MyKey");
Exception exception = assertThrows(Exception.class, () -> result.block());
assertTrue(exception.getCause().getCause() instanceof ArithmeticException);
}
@Test
public void getActorState() {
byte[] data = "hello world".getBytes();
SettableFuture<DaprProtos.GetActorStateResponse> settableFuture = SettableFuture.create();
settableFuture.set(DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(data)).build());
when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
"MyKey"
)))).thenReturn(settableFuture);
Mono<byte[]> result = client.getActorState(ACTOR_TYPE, ACTOR_ID, "MyKey");
assertArrayEquals(data, result.block());
}
@Test
public void saveActorStateTransactionallyException() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
new ArrayList<>()
)))).thenReturn(settableFuture);
Mono<Void> result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, new ArrayList<>());
Exception exception = assertThrows(Exception.class, () -> result.block());
assertTrue(exception.getCause().getCause() instanceof ArithmeticException);
}
@Test
public void saveActorStateTransactionally() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
ActorStateOperation[] operations = new ActorStateOperation[] {
new ActorStateOperation("upsert", "mykey", "hello world"),
new ActorStateOperation("delete", "mykey", null),
};
when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
Arrays.asList(operations)
)))).thenReturn(settableFuture);
Mono<Void> result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
result.block();
}
@Test
public void saveActorStateTransactionallyByteArray() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
ActorStateOperation[] operations = new ActorStateOperation[] {
new ActorStateOperation("upsert", "mykey", "hello world".getBytes()),
new ActorStateOperation("delete", "mykey", null),
};
when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
Arrays.asList(operations)
)))).thenReturn(settableFuture);
Mono<Void> result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
result.block();
}
@Test
public void saveActorStateTransactionallyInvalidValueType() {
ActorStateOperation[] operations = new ActorStateOperation[] {
new ActorStateOperation("upsert", "mykey", 123),
new ActorStateOperation("delete", "mykey", null),
};
Mono<Void> result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
assertThrows(IllegalArgumentException.class, () -> result.block());
}
@Test
public void registerActorReminder() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String reminderName = "myreminder";
ActorReminderParams params = new ActorReminderParams(
"hello world".getBytes(),
Duration.ofSeconds(1),
Duration.ofSeconds(2)
);
when(grpcStub.registerActorReminder(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(reminderName, argument.getName());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.registerActorReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params);
result.block();
}
@Test
public void unregisterActorReminder() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String reminderName = "myreminder";
when(grpcStub.unregisterActorReminder(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(reminderName, argument.getName());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.unregisterActorReminder(ACTOR_TYPE, ACTOR_ID, reminderName);
result.block();
}
@Test
public void registerActorTimer() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String timerName = "mytimer";
String callback = "mymethod";
ActorTimerParams params = new ActorTimerParams(
callback,
"hello world".getBytes(),
Duration.ofSeconds(1),
Duration.ofSeconds(2)
);
when(grpcStub.registerActorTimer(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(timerName, argument.getName());
assertEquals(callback, argument.getCallback());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.registerActorTimer(ACTOR_TYPE, ACTOR_ID, timerName, params);
result.block();
}
@Test
public void unregisterActorTimer() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String timerName = "mytimer";
when(grpcStub.unregisterActorTimer(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(timerName, argument.getName());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.unregisterActorTimer(ACTOR_TYPE, ACTOR_ID, timerName);
result.block();
}
private static Any getAny(Object value) throws IOException {
if (value instanceof byte[]) {
String base64 = OBJECT_MAPPER.writeValueAsString(value);
return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build();
} else if (value instanceof String) {
return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build();
}
throw new IllegalArgumentException("Must be byte[] or String");
}
private static class GetActorStateRequestMatcher implements ArgumentMatcher<DaprProtos.GetActorStateRequest> {
private final String actorType;
private final String actorId;
private final String key;
GetActorStateRequestMatcher(String actorType, String actorId, String key) {
this.actorType = actorType;
this.actorId = actorId;
this.key = key;
}
@Override
public boolean matches(DaprProtos.GetActorStateRequest argument) {
if (argument == null) {
return false;
}
return actorType.equals(argument.getActorType())
&& actorId.equals(argument.getActorId())
&& key.equals(argument.getKey());
}
}
private static class ExecuteActorStateTransactionRequestMatcher
implements ArgumentMatcher<DaprProtos.ExecuteActorStateTransactionRequest> {
private final String actorType;
private final String actorId;
private final List<ActorStateOperation> operations;
ExecuteActorStateTransactionRequestMatcher(String actorType, String actorId, List<ActorStateOperation> operations) {
this.actorType = actorType;
this.actorId = actorId;
this.operations = operations;
}
@Override
public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) {
if (argument == null) {
return false;
}
if (operations.size() != argument.getOperationsCount()) {
return false;
}
if (!actorType.equals(argument.getActorType())
|| !actorId.equals(argument.getActorId())) {
return false;
}
for(ActorStateOperation operation : operations) {
boolean found = false;
for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) {
if (operation.getKey().equals(grpcOperation.getKey())
&& operation.getOperationType().equals(grpcOperation.getOperationType())
&& nullableEquals(operation.getValue(), grpcOperation.getValue())) {
found = true;
break;
}
}
if (!found) {
return false;
}
}
return true;
}
private static boolean nullableEquals(Object one, Any another) {
if (one == null) {
return another.getValue().isEmpty();
}
if ((one == null) ^ (another == null)) {
return false;
}
try {
Any oneAny = getAny(one);
return oneAny.getValue().equals(another.getValue());
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
}
}

View File

@ -7,15 +7,29 @@ package io.dapr.actors.runtime;
import io.dapr.client.DaprHttp;
import io.dapr.client.DaprHttpProxy;
import io.dapr.config.Properties;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import okhttp3.mock.RuleAnswer;
import okio.Buffer;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
public class DaprHttpClientTest {
@ -52,8 +66,8 @@ public class DaprHttpClientTest {
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono =
DaprHttpClient.saveActorStateTransactionally("DemoActor", "1", "".getBytes());
List<ActorStateOperation> ops = Collections.singletonList(new ActorStateOperation("UPSERT", "key", "value"));
Mono<Void> mono = DaprHttpClient.saveActorStateTransactionally("DemoActor", "1", ops);
assertNull(mono.block());
}
@ -65,7 +79,11 @@ public class DaprHttpClientTest {
DaprHttp daprHttp = new DaprHttpProxy(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono =
DaprHttpClient.registerActorReminder("DemoActor", "1", "reminder", "".getBytes());
DaprHttpClient.registerActorReminder(
"DemoActor",
"1",
"reminder",
new ActorReminderParams("".getBytes(), Duration.ofSeconds(1), Duration.ofSeconds(2)));
assertNull(mono.block());
}
@ -82,13 +100,42 @@ public class DaprHttpClientTest {
@Test
public void registerActorTimer() {
String data = "hello world";
mockInterceptor.addRule()
.put("http://127.0.0.1:3000/v1.0/actors/DemoActor/1/timers/timer")
.respond(EXPECTED_RESULT);
.answer(new RuleAnswer() {
@Override
public Response.Builder respond(Request request) {
String expectedBody = "{\"dueTime\":\"0h0m5s0ms\"," +
"\"period\":\"0h0m10s0ms\"," +
"\"callback\":\"mycallback\"," +
"\"data\":\""+ Base64.getEncoder().encodeToString(data.getBytes()) +"\"}";
String body = "";
try {
Buffer buffer = new Buffer();
request.body().writeTo(buffer);
body = buffer.readString(Charset.defaultCharset());
} catch (IOException e) {
fail();
}
assertEquals(expectedBody, body);
return new Response.Builder()
.code(200)
.body(ResponseBody.create("{}", MediaType.get("application/json")));
}
});
DaprHttp daprHttp = new DaprHttpProxy(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono =
DaprHttpClient.registerActorTimer("DemoActor", "1", "timer", "".getBytes());
DaprHttpClient.registerActorTimer(
"DemoActor",
"1",
"timer",
new ActorTimerParams(
"mycallback",
data.getBytes(),
Duration.ofSeconds(5),
Duration.ofSeconds(10)));
assertNull(mono.block());
}
@ -102,5 +149,4 @@ public class DaprHttpClientTest {
Mono<Void> mono = DaprHttpClient.unregisterActorTimer("DemoActor", "1", "timer");
assertNull(mono.block());
}
}

View File

@ -5,7 +5,6 @@
package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import io.dapr.serializer.DaprObjectSerializer;
@ -15,7 +14,6 @@ import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@ -66,7 +64,7 @@ public class DaprStateAsyncProviderTest {
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return id == customer.id &&
Objects.equals(name, customer.name);
Objects.equals(name, customer.name);
}
@Override
@ -80,67 +78,60 @@ public class DaprStateAsyncProviderTest {
public void happyCaseApply() {
DaprClient daprClient = mock(DaprClient.class);
when(daprClient
.saveActorStateTransactionally(
eq("MyActor"),
eq("123"),
argThat(s -> {
try {
JsonNode node = OBJECT_MAPPER.readTree(s);
if (node == null) {
return false;
}
if (node.size() != 4) {
return false;
}
boolean foundInsertName = false;
boolean foundUpdateZipcode = false;
boolean foundDeleteFlag = false;
boolean foundUpdateBytes = false;
for (JsonNode operation : node) {
if (operation.get("operation") == null) {
return false;
}
if (operation.get("request") == null) {
.saveActorStateTransactionally(
eq("MyActor"),
eq("123"),
argThat(operations -> {
if (operations == null) {
return false;
}
String opName = operation.get("operation").asText();
String key = operation.get("request").get("key").asText();
JsonNode valueNode = operation.get("request").get("value");
if (operations.size() != 4) {
return false;
}
String value = (valueNode == null) ? null : valueNode.asText();
foundInsertName |= "upsert".equals(opName) &&
"name".equals(key) &&
"Jon Doe".equals(value);
foundUpdateZipcode |= "upsert".equals(opName) &&
"zipcode".equals(key) &&
"98011".equals(value);
foundDeleteFlag |= "delete".equals(opName) &&
"flag".equals(key) &&
(value == null);
foundUpdateBytes |= "upsert".equals(opName) &&
"bytes".equals(key) &&
"AQ==".equals(value);
}
boolean foundInsertName = false;
boolean foundUpdateZipcode = false;
boolean foundDeleteFlag = false;
boolean foundUpdateBytes = false;
for (ActorStateOperation operation : operations) {
if (operation.getOperationType() == null) {
return false;
}
if (operation.getKey() == null) {
return false;
}
return foundInsertName && foundUpdateZipcode && foundDeleteFlag && foundUpdateBytes;
} catch (IOException e) {
e.printStackTrace();
return false;
}
})))
.thenReturn(Mono.empty());
String opName = operation.getOperationType();
String key = operation.getKey();
Object value = operation.getValue();
foundInsertName |= "upsert".equals(opName) &&
"name".equals(key) &&
"\"Jon Doe\"".equals(value);
foundUpdateZipcode |= "upsert".equals(opName) &&
"zipcode".equals(key) &&
"98011".equals(value);
foundDeleteFlag |= "delete".equals(opName) &&
"flag".equals(key) &&
(value == null);
foundUpdateBytes |= "upsert".equals(opName) &&
"bytes".equals(key) &&
Arrays.equals(new byte[]{0x1}, (byte[]) value);
}
return foundInsertName && foundUpdateZipcode && foundDeleteFlag && foundUpdateBytes;
})))
.thenReturn(Mono.empty());
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
provider.apply("MyActor",
new ActorId("123"),
createInsertChange("name", "Jon Doe"),
createUpdateChange("zipcode", 98011),
createDeleteChange("flag"),
createUpdateChange("bytes", new byte[] {0x1}))
.block();
new ActorId("123"),
createInsertChange("name", "Jon Doe"),
createUpdateChange("zipcode", 98011),
createDeleteChange("flag"),
createUpdateChange("bytes", new byte[]{0x1}))
.block();
verify(daprClient).saveActorStateTransactionally(eq("MyActor"), eq("123"), any());
}
@ -149,56 +140,61 @@ public class DaprStateAsyncProviderTest {
public void happyCaseLoad() throws Exception {
DaprClient daprClient = mock(DaprClient.class);
when(daprClient
.getActorState(any(), any(), eq("name")))
.thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe")));
.getActorState(any(), any(), eq("name")))
.thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe")));
when(daprClient
.getActorState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just(SERIALIZER.serialize(98021)));
.getActorState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just(SERIALIZER.serialize(98021)));
when(daprClient
.getActorState(any(), any(), eq("goals")))
.thenReturn(Mono.just(SERIALIZER.serialize(98)));
.getActorState(any(), any(), eq("goals")))
.thenReturn(Mono.just(SERIALIZER.serialize(98)));
when(daprClient
.getActorState(any(), any(), eq("balance")))
.thenReturn(Mono.just(SERIALIZER.serialize(46.55)));
.getActorState(any(), any(), eq("balance")))
.thenReturn(Mono.just(SERIALIZER.serialize(46.55)));
when(daprClient
.getActorState(any(), any(), eq("active")))
.thenReturn(Mono.just(SERIALIZER.serialize(true)));
.getActorState(any(), any(), eq("active")))
.thenReturn(Mono.just(SERIALIZER.serialize(true)));
when(daprClient
.getActorState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes()));
.getActorState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("anotherCustomer")))
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes()));
.getActorState(any(), any(), eq("anotherCustomer")))
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("nullCustomer")))
.thenReturn(Mono.empty());
.getActorState(any(), any(), eq("nullCustomer")))
.thenReturn(Mono.empty());
when(daprClient
.getActorState(any(), any(), eq("bytes")))
.thenReturn(Mono.just("\"QQ==\"".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("emptyBytes")))
.thenReturn(Mono.just(new byte[0]));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
Assert.assertEquals("Jon Doe",
provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block());
provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block());
Assert.assertEquals(98021,
(int)provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block());
(int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block());
Assert.assertEquals(98,
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
Assert.assertEquals(98,
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
Assert.assertEquals(46.55,
(double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(),
EPSILON);
(double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(),
EPSILON);
Assert.assertEquals(true,
(boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block());
(boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block());
Assert.assertEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block());
provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block());
Assert.assertNotEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block());
provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block());
Assert.assertNull(
provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block());
provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block());
Assert.assertArrayEquals("A".getBytes(),
provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block());
Assert.assertNull(
provider.load("MyActor", new ActorId("123"), "emptyBytes", TypeRef.get(byte[].class)).block());
}
@Test
@ -207,34 +203,34 @@ public class DaprStateAsyncProviderTest {
// Keys that exists.
when(daprClient
.getActorState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe".getBytes()));
.getActorState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021".getBytes()));
.getActorState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98".getBytes()));
.getActorState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55".getBytes()));
.getActorState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("active")))
.thenReturn(Mono.just("true".getBytes()));
.getActorState(any(), any(), eq("active")))
.thenReturn(Mono.just("true".getBytes()));
when(daprClient
.getActorState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes()));
.getActorState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes()));
// Keys that do not exist.
when(daprClient
.getActorState(any(), any(), eq("Does not exist")))
.thenReturn(Mono.empty());
.getActorState(any(), any(), eq("Does not exist")))
.thenReturn(Mono.empty());
when(daprClient
.getActorState(any(), any(), eq("NAME")))
.thenReturn(Mono.empty());
.getActorState(any(), any(), eq("NAME")))
.thenReturn(Mono.empty());
when(daprClient
.getActorState(any(), any(), eq(null)))
.thenReturn(Mono.empty());
.getActorState(any(), any(), eq(null)))
.thenReturn(Mono.empty());
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);

View File

@ -8,7 +8,11 @@ package io.dapr.actors.runtime;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* Class used to test different serializer implementations.

View File

@ -0,0 +1 @@
mock-maker-inline

View File

@ -19,7 +19,7 @@ import static io.dapr.it.Retry.callWithRetry;
public class AppRun implements Stoppable {
private static final String APP_COMMAND =
"mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\"";
"mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D dapr.grpc.enabled=%b";
private final DaprPorts ports;
@ -30,10 +30,11 @@ public class AppRun implements Stoppable {
AppRun(DaprPorts ports,
String successMessage,
Class serviceClass,
int maxWaitMilliseconds) {
int maxWaitMilliseconds,
boolean useGRPC) {
this.command = new Command(
successMessage,
buildCommand(serviceClass, ports),
buildCommand(serviceClass, ports, useGRPC),
new HashMap<>() {{
put("DAPR_HTTP_PORT", ports.getHttpPort().toString());
put("DAPR_GRPC_PORT", ports.getGrpcPort().toString());
@ -71,9 +72,10 @@ public class AppRun implements Stoppable {
}
}
private static String buildCommand(Class serviceClass, DaprPorts ports) {
private static String buildCommand(Class serviceClass, DaprPorts ports, boolean useGRPC) {
return String.format(APP_COMMAND, serviceClass.getCanonicalName(),
ports.getAppPort() != null ? ports.getAppPort().toString() : "");
ports.getAppPort() != null ? ports.getAppPort().toString() : "",
useGRPC);
}
private static void assertListeningOnPort(int port) {

View File

@ -8,10 +8,13 @@ package io.dapr.it;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.AfterClass;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
public abstract class BaseIT {
@ -19,7 +22,9 @@ public abstract class BaseIT {
private static final Map<String, DaprRun.Builder> DAPR_RUN_BUILDERS = new HashMap<>();
private static final Collection<Stoppable> TO_BE_STOPPED = new ArrayList<>();
private static final Queue<Stoppable> TO_BE_STOPPED = new LinkedList<>();
private static final Queue<Closeable> TO_BE_CLOSED = new LinkedList<>();
protected static DaprRun startDaprApp(
String testName,
@ -27,13 +32,23 @@ public abstract class BaseIT {
Class serviceClass,
Boolean useAppPort,
int maxWaitMilliseconds) throws Exception {
return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds);
return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, true);
}
protected static DaprRun startDaprApp(
String testName,
String successMessage,
Class serviceClass,
Boolean useAppPort,
int maxWaitMilliseconds,
boolean useGRPC) throws Exception {
return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, useGRPC);
}
protected static DaprRun startDaprApp(
String testName,
int maxWaitMilliseconds) throws Exception {
return startDaprApp(testName, "You're up and running!", null, false, true, maxWaitMilliseconds);
return startDaprApp(testName, "You're up and running!", null, false, true, maxWaitMilliseconds, true);
}
protected static DaprRun startDaprApp(
@ -42,12 +57,14 @@ public abstract class BaseIT {
Class serviceClass,
Boolean useAppPort,
Boolean useDaprPorts,
int maxWaitMilliseconds) throws Exception {
int maxWaitMilliseconds,
boolean useGRPC) throws Exception {
DaprRun.Builder builder = new DaprRun.Builder(
testName,
() -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts),
successMessage,
maxWaitMilliseconds).withServiceClass(serviceClass);
maxWaitMilliseconds,
useGRPC).withServiceClass(serviceClass);
DaprRun run = builder.build();
TO_BE_STOPPED.add(run);
DAPR_RUN_BUILDERS.put(run.getAppName(), builder);
@ -56,17 +73,28 @@ public abstract class BaseIT {
return run;
}
protected static ImmutablePair<AppRun, DaprRun> startSplitDaprAndApp(
String testName,
String successMessage,
Class serviceClass,
Boolean useAppPort,
int maxWaitMilliseconds) throws Exception {
return startSplitDaprAndApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, true);
}
protected static ImmutablePair<AppRun, DaprRun> startSplitDaprAndApp(
String testName,
String successMessage,
Class serviceClass,
Boolean useAppPort,
int maxWaitMilliseconds) throws Exception {
int maxWaitMilliseconds,
boolean useGRPC) throws Exception {
DaprRun.Builder builder = new DaprRun.Builder(
testName,
() -> DaprPorts.build(useAppPort, true, true),
successMessage,
maxWaitMilliseconds).withServiceClass(serviceClass);
maxWaitMilliseconds,
useGRPC).withServiceClass(serviceClass);
ImmutablePair<AppRun, DaprRun> runs = builder.splitBuild();
TO_BE_STOPPED.add(runs.left);
TO_BE_STOPPED.add(runs.right);
@ -79,9 +107,19 @@ public abstract class BaseIT {
@AfterClass
public static void cleanUp() throws Exception {
for (Stoppable toBeStopped : TO_BE_STOPPED) {
while (!TO_BE_CLOSED.isEmpty()) {
Closeable toBeClosed = TO_BE_CLOSED.remove();
toBeClosed.close();
}
while (!TO_BE_STOPPED.isEmpty()) {
Stoppable toBeStopped = TO_BE_STOPPED.remove();
toBeStopped.stop();
}
}
protected static <T extends Closeable> T deferClose(T closeable) {
TO_BE_CLOSED.add(closeable);
return closeable;
}
}

View File

@ -23,7 +23,7 @@ public class DaprRun implements Stoppable {
// the arg in -Dexec.args is the app's port
private static final String DAPR_COMMAND =
" -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\"";
" -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D dapr.grpc.enabled=%s";
private final DaprPorts ports;
@ -43,11 +43,12 @@ public class DaprRun implements Stoppable {
DaprPorts ports,
String successMessage,
Class serviceClass,
int maxWaitMilliseconds) {
int maxWaitMilliseconds,
boolean useGRPC) {
// The app name needs to be deterministic since we depend on it to kill previous runs.
this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName());
this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports));
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, useGRPC));
this.listCommand = new Command(
this.appName,
"dapr list");
@ -163,14 +164,14 @@ public class DaprRun implements Stoppable {
return appName;
}
private static String buildDaprCommand(String appName, Class serviceClass, DaprPorts ports) {
private static String buildDaprCommand(String appName, Class serviceClass, DaprPorts ports, boolean useGRPC) {
StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName))
.append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "")
.append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "")
.append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "")
.append(serviceClass == null ? "" :
String.format(DAPR_COMMAND, serviceClass.getCanonicalName(),
ports.getAppPort() != null ? ports.getAppPort().toString() : ""));
ports.getAppPort() != null ? ports.getAppPort().toString() : "", useGRPC));
return stringBuilder.toString();
}
@ -199,15 +200,19 @@ public class DaprRun implements Stoppable {
private Class serviceClass;
private boolean useGRPC;
Builder(
String testName,
Supplier<DaprPorts> portsSupplier,
String successMessage,
int maxWaitMilliseconds) {
int maxWaitMilliseconds,
boolean useGRPC) {
this.testName = testName;
this.portsSupplier = portsSupplier;
this.successMessage = successMessage;
this.maxWaitMilliseconds = maxWaitMilliseconds;
this.useGRPC = useGRPC;
}
public Builder withServiceClass(Class serviceClass) {
@ -221,7 +226,8 @@ public class DaprRun implements Stoppable {
this.portsSupplier.get(),
this.successMessage,
this.serviceClass,
this.maxWaitMilliseconds);
this.maxWaitMilliseconds,
this.useGRPC);
}
/**
@ -234,14 +240,16 @@ public class DaprRun implements Stoppable {
ports,
this.successMessage,
this.serviceClass,
this.maxWaitMilliseconds);
this.maxWaitMilliseconds,
this.useGRPC);
DaprRun daprRun = new DaprRun(
this.testName,
ports,
DAPR_SUCCESS_MESSAGE,
null,
this.maxWaitMilliseconds);
this.maxWaitMilliseconds,
this.useGRPC);
return new ImmutablePair<>(appRun, daprRun);
}

View File

@ -38,7 +38,7 @@ public class ActivationDeactivationIT extends BaseIT {
final AtomicInteger atomicInteger = new AtomicInteger(1);
logger.debug("Creating proxy builder");
ActorProxyBuilder<DemoActor> proxyBuilder = new ActorProxyBuilder(DemoActor.class);
ActorProxyBuilder<DemoActor> proxyBuilder = deferClose(new ActorProxyBuilder(DemoActor.class));
logger.debug("Creating actorId");
ActorId actorId1 = new ActorId(Integer.toString(atomicInteger.getAndIncrement()));
logger.debug("Building proxy");

View File

@ -13,16 +13,38 @@ import io.dapr.it.DaprRun;
import io.dapr.it.actors.services.springboot.StatefulActor;
import io.dapr.it.actors.services.springboot.StatefulActorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import static io.dapr.it.Retry.callWithRetry;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class ActorStateIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorStateIT.class);
/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false, false }, { false, true }, { true, false }, { true, true } });
}
@Parameterized.Parameter(0)
public boolean useGrpc;
@Parameterized.Parameter(1)
public boolean useGrpcInService;
@Test
public void writeReadState() throws Exception {
logger.debug("Starting actor runtime ...");
@ -32,15 +54,23 @@ public class ActorStateIT extends BaseIT {
StatefulActorService.SUCCESS_MESSAGE,
StatefulActorService.class,
true,
60000);
60000,
useGrpcInService);
if (this.useGrpc) {
runtime.switchToGRPC();
} else {
runtime.switchToHTTP();
}
String message = "This is a message to be saved and retrieved.";
String name = "Jon Doe";
byte[] bytes = new byte[] { 0x1 };
ActorId actorId = new ActorId(Long.toString(System.currentTimeMillis()));
ActorId actorId = new ActorId(
String.format("%d-%b-%b", System.currentTimeMillis(), this.useGrpc, this.useGrpcInService));
String actorType = "StatefulActorTest";
logger.debug("Building proxy ...");
ActorProxyBuilder<ActorProxy> proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class);
ActorProxyBuilder<ActorProxy> proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class));
ActorProxy proxy = proxyBuilder.build(actorId);
// Validate conditional read works.
@ -115,13 +145,22 @@ public class ActorStateIT extends BaseIT {
runtime.stop();
logger.debug("Starting service ...");
startDaprApp(
DaprRun run2 = startDaprApp(
this.getClass().getSimpleName(),
StatefulActorService.SUCCESS_MESSAGE,
StatefulActorService.class,
true,
60000);
60000,
useGrpcInService);
if (this.useGrpc) {
run2.switchToGRPC();
} else {
run2.switchToHTTP();
}
// Need new proxy builder because the proxy builder holds the channel.
proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class));
ActorProxy newProxy = proxyBuilder.build(actorId);
callWithRetry(() -> {

View File

@ -47,7 +47,7 @@ public class ActorTimerRecoveryIT extends BaseIT {
String actorType="MyActorTest";
logger.debug("Creating proxy builder");
ActorProxyBuilder<ActorProxy> proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class);
ActorProxyBuilder<ActorProxy> proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class));
logger.debug("Creating actorId");
ActorId actorId = new ActorId(UUID.randomUUID().toString());
logger.debug("Building proxy");

View File

@ -74,7 +74,7 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
String actorType="MyActorTest";
logger.debug("Creating proxy builder");
ActorProxyBuilder<ActorProxy> proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class);
ActorProxyBuilder<ActorProxy> proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class));
logger.debug("Creating actorId");
ActorId actorId1 = new ActorId(ACTOR_ID);
logger.debug("Building proxy");

View File

@ -11,7 +11,6 @@ import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.v1.DaprGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import okhttp3.OkHttpClient;
import java.io.Closeable;
@ -27,6 +26,11 @@ public class DaprClientBuilder {
*/
private final boolean useGrpc;
/**
* Builder for Dapr's HTTP Client.
*/
private final DaprHttpBuilder daprHttpBuilder;
/**
* Serializer used for request and response objects in DaprClient.
*/
@ -47,6 +51,7 @@ public class DaprClientBuilder {
this.objectSerializer = new DefaultObjectSerializer();
this.stateSerializer = new DefaultObjectSerializer();
this.useGrpc = Properties.USE_GRPC.get();
this.daprHttpBuilder = new DaprHttpBuilder();
}
/**
@ -127,12 +132,6 @@ public class DaprClientBuilder {
* @return DaprClient over HTTP.
*/
private DaprClient buildDaprClientHttp() {
int port = Properties.HTTP_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
return new DaprClientHttp(daprHttp, this.objectSerializer, this.stateSerializer);
return new DaprClientHttp(this.daprHttpBuilder.build(), this.objectSerializer, this.stateSerializer);
}
}