diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c7a3b869e..8ac858eb3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,10 +22,11 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: 13.0.x + DAPR_CLI_VER: 1.0.0-rc.2 DAPR_RUNTIME_VER: 1.0.0-rc.1 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh - DAPR_CLI_REF: v1.0.0-rc.1 - DAPR_REF: + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/a60221e96406a145ab22e454eec6642961725f5c/install/install.sh + DAPR_CLI_REF: + DAPR_REF: 83f5c45362b0c577139b1887276d7cf1b7308506 OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} GPG_KEY: ${{ secrets.GPG_KEY }} @@ -37,7 +38,7 @@ jobs: with: java-version: ${{ env.JDK_VER }} - name: Set up Dapr CLI - run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash + run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }} - name: Set up Go ${{ env.GOVER }} if: env.DAPR_REF != '' || env.DAPR_CLI_REF != '' uses: actions/setup-go@v2 diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java index 3867693d3..08bf9b343 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java @@ -29,24 +29,24 @@ public class DemoActorClient { * @throws InterruptedException If program has been interrupted. */ public static void main(String[] args) throws InterruptedException { - ActorProxyBuilder builder = new ActorProxyBuilder(DemoActor.class); + try (ActorProxyBuilder builder = new ActorProxyBuilder(DemoActor.class)) { + List threads = new ArrayList<>(NUM_ACTORS); - List threads = new ArrayList<>(NUM_ACTORS); + // Creates multiple actors. + for (int i = 0; i < NUM_ACTORS; i++) { + ActorId actorId = ActorId.createRandom(); + DemoActor actor = builder.build(actorId); - // Creates multiple actors. - for (int i = 0; i < NUM_ACTORS; i++) { - ActorId actorId = ActorId.createRandom(); - DemoActor actor = builder.build(actorId); + // Start a thread per actor. + Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor)); + thread.start(); + threads.add(thread); + } - // Start a thread per actor. - Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor)); - thread.start(); - threads.add(thread); - } - - // Waits for threads to finish. - for (Thread thread : threads) { - thread.join(); + // Waits for threads to finish. + for (Thread thread : threads) { + thread.join(); + } } System.out.println("Done."); diff --git a/examples/src/main/java/io/dapr/examples/actors/http/README.md b/examples/src/main/java/io/dapr/examples/actors/http/README.md index 21d2d098f..611833886 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/README.md +++ b/examples/src/main/java/io/dapr/examples/actors/http/README.md @@ -143,17 +143,18 @@ public class DemoActorClient { private static final int NUM_ACTORS = 3; public static void main(String[] args) throws InterruptedException { - ActorProxyBuilder builder = new ActorProxyBuilder(DemoActor.class); - ///... - for (int i = 0; i < NUM_ACTORS; i++) { - DemoActor actor = builder.build(ActorId.createRandom()); + try (ActorProxyBuilder builder = new ActorProxyBuilder(DemoActor.class)) { + ///... + for (int i = 0; i < NUM_ACTORS; i++) { + DemoActor actor = builder.build(ActorId.createRandom()); - // Start a thread per actor. - Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor)); - thread.start(); - threads.add(thread); + // Start a thread per actor. + Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor)); + thread.start(); + threads.add(thread); + } + ///... } - ///... } private static final void callActorForever(String actorId, DemoActor actor) { @@ -182,7 +183,9 @@ public class DemoActorClient { } ``` -First, the client defines how many actors it is going to create. Then the main method declares a `ActorProxyBuilder` to create instances of the `DemoActor` interface, which are implemented automatically by the SDK and make remote calls to the equivalent methods in Actor runtime. The code executes the `callActorForever` private method once per actor. Initially, it will invoke `registerReminder()`, which sets the due time and period for the reminder. Then, `incrementAndGet()` increments a counter, persists it and sends it back as response. Finally `say` method which will print a message containing the received string along with the formatted server time. +First, the client defines how many actors it is going to create. The main method declares a `ActorProxyBuilder` to create instances of the `DemoActor` interface, which are implemented automatically by the SDK and make remote calls to the equivalent methods in Actor runtime. `ActorProxyBuilder` implements `Closeable`, which means it holds resources that need to be closed. In this example, we use the "try-resource" feature in Java. + +Then, the code executes the `callActorForever` private method once per actor. Initially, it will invoke `registerReminder()`, which sets the due time and period for the reminder. Then, `incrementAndGet()` increments a counter, persists it and sends it back as response. Finally `say` method which will print a message containing the received string along with the formatted server time. Use the follow command to execute the DemoActorClient: diff --git a/pom.xml b/pom.xml index 02c11c807..8d47b7f76 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ UTF-8 1.33.1 3.13.0 - https://raw.githubusercontent.com/dapr/dapr/f3ddd345962d7e065e26a54f7b2f1f2db9af1fdb/dapr/proto + https://raw.githubusercontent.com/dapr/dapr/83f5c45362b0c577139b1887276d7cf1b7308506/dapr/proto 1.6.2 3.1.1 1.8 diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java index 08b230a3f..586ddb92b 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java @@ -8,20 +8,25 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; import io.dapr.actors.ActorUtils; import io.dapr.client.DaprHttpBuilder; +import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.v1.DaprGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.Closeable; import java.lang.reflect.Proxy; /** * Builder to generate an ActorProxy instance. Builder can be reused for multiple instances. */ -public class ActorProxyBuilder { +public class ActorProxyBuilder implements Closeable { /** - * Builder for Dapr's raw http client. + * Determine if this builder will create GRPC clients instead of HTTP clients. */ - private final DaprHttpBuilder daprHttpBuilder = new DaprHttpBuilder(); + private final boolean useGrpc; /** * Actor's type. @@ -38,6 +43,16 @@ public class ActorProxyBuilder { */ private DaprObjectSerializer objectSerializer; + /** + * Builds Dapr HTTP client. + */ + private DaprHttpBuilder daprHttpBuilder; + + /** + * Channel for communication with Dapr. + */ + private final ManagedChannel channel; + /** * Instantiates a new builder for a given Actor type, using {@link DefaultObjectSerializer} by default. * @@ -65,9 +80,12 @@ public class ActorProxyBuilder { throw new IllegalArgumentException("ActorTypeClass is required."); } + this.useGrpc = Properties.USE_GRPC.get(); this.actorType = actorType; this.objectSerializer = new DefaultObjectSerializer(); this.clazz = actorTypeClass; + this.daprHttpBuilder = new DaprHttpBuilder(); + this.channel = buildManagedChannel(); } /** @@ -100,7 +118,7 @@ public class ActorProxyBuilder { this.actorType, actorId, this.objectSerializer, - new DaprHttpClient(this.daprHttpBuilder.build())); + buildDaprClient()); if (this.clazz.equals(ActorProxy.class)) { // If users want to use the not strongly typed API, we respect that here. @@ -113,4 +131,45 @@ public class ActorProxyBuilder { proxy); } + /** + * Build an instance of the Client based on the provided setup. + * + * @return an instance of the setup Client + * @throws java.lang.IllegalStateException if any required field is missing + */ + private DaprClient buildDaprClient() { + if (this.useGrpc) { + return new DaprGrpcClient(DaprGrpc.newFutureStub(this.channel)); + } + + return new DaprHttpClient(daprHttpBuilder.build()); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } + } + + /** + * Creates a GRPC managed channel (or null, if not applicable). + * + * @return GRPC managed channel or null. + */ + private static ManagedChannel buildManagedChannel() { + if (!Properties.USE_GRPC.get()) { + return null; + } + + int port = Properties.GRPC_PORT.get(); + if (port <= 0) { + throw new IllegalStateException("Invalid port."); + } + + return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port).usePlaintext().build(); + } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java new file mode 100644 index 000000000..1e9991578 --- /dev/null +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.client; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import reactor.core.publisher.Mono; + +/** + * A DaprClient over GRPC for Actor. + */ +class DaprGrpcClient implements DaprClient { + + /** + * The GRPC client to be used. + * + * @see DaprGrpc.DaprFutureStub + */ + private DaprGrpc.DaprFutureStub client; + + /** + * Internal constructor. + * + * @param grpcClient Dapr's GRPC client. + */ + DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) { + this.client = grpcClient; + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeActorMethod(String actorType, String actorId, String methodName, byte[] jsonPayload) { + return Mono.fromCallable(() -> { + DaprProtos.InvokeActorRequest req = + DaprProtos.InvokeActorRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setMethod(methodName) + .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload)) + .build(); + + ListenableFuture futureResponse = client.invokeActor(req); + return futureResponse.get(); + }).map(r -> r.getData().toByteArray()); + } +} diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java index 72e164bb3..a12270bed 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java @@ -51,5 +51,4 @@ class DaprHttpClient implements DaprClient { this.client.invokeApi(DaprHttp.HttpMethods.POST.name(), url, null, jsonPayload, null, null); return responseMono.map(r -> r.getBody()); } - } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java index ead31a92e..540b8cfe4 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java @@ -78,6 +78,15 @@ public abstract class AbstractActor { return this.id; } + /** + * Returns the actor's type. + * + * @return Actor type. + */ + String getType() { + return this.actorRuntimeContext.getActorTypeInformation().getName(); + } + /** * Returns the state store manager for this Actor. * @@ -105,12 +114,11 @@ public abstract class AbstractActor { try { byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state); ActorReminderParams params = new ActorReminderParams(data, dueTime, period); - byte[] serialized = INTERNAL_SERIALIZER.serialize(params); return this.actorRuntimeContext.getDaprClient().registerActorReminder( this.actorRuntimeContext.getActorTypeInformation().getName(), this.id.toString(), reminderName, - serialized); + params); } catch (IOException e) { return Mono.error(e); } @@ -136,7 +144,7 @@ public abstract class AbstractActor { T state, Duration dueTime, Duration period) { - return Mono.fromSupplier(() -> { + try { if ((callback == null) || callback.isEmpty()) { throw new IllegalArgumentException("Timer requires a callback function."); } @@ -146,18 +154,17 @@ public abstract class AbstractActor { name = String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID().toString()); } - return new ActorTimer(this, name, callback, state, dueTime, period); - }).flatMap(actorTimer -> { - try { - return this.actorRuntimeContext.getDaprClient().registerActorTimer( - this.actorRuntimeContext.getActorTypeInformation().getName(), - this.id.toString(), - actorTimer.getName(), - INTERNAL_SERIALIZER.serialize(actorTimer)).then(Mono.just(actorTimer.getName())); - } catch (Exception e) { - return Mono.error(e); - } - }); + byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state); + ActorTimerParams actorTimer = new ActorTimerParams(callback, data, dueTime, period); + + return this.actorRuntimeContext.getDaprClient().registerActorTimer( + this.actorRuntimeContext.getActorTypeInformation().getName(), + this.id.toString(), + name, + actorTimer).thenReturn(name); + } catch (Exception e) { + return Mono.error(e); + } } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java index cbc694723..35a2fc240 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java @@ -34,9 +34,9 @@ public class ActorObjectSerializer extends ObjectSerializer { return null; } - if (state.getClass() == ActorTimer.class) { + if (state.getClass() == ActorTimerParams.class) { // Special serializer for this internal classes. - return serialize((ActorTimer) state); + return serialize((ActorTimerParams) state); } if (state.getClass() == ActorReminderParams.class) { @@ -61,7 +61,7 @@ public class ActorObjectSerializer extends ObjectSerializer { * @return JSON String. * @throws IOException If cannot generate JSON. */ - private byte[] serialize(ActorTimer timer) throws IOException { + private byte[] serialize(ActorTimerParams timer) throws IOException { if (timer == null) { return null; } @@ -72,8 +72,8 @@ public class ActorObjectSerializer extends ObjectSerializer { generator.writeStringField("dueTime", DurationUtils.convertDurationToDaprFormat(timer.getDueTime())); generator.writeStringField("period", DurationUtils.convertDurationToDaprFormat(timer.getPeriod())); generator.writeStringField("callback", timer.getCallback()); - if (timer.getState() != null) { - generator.writeBinaryField("data", this.serialize(timer.getState())); + if (timer.getData() != null) { + generator.writeBinaryField("data", timer.getData()); } generator.writeEndObject(); generator.close(); @@ -148,6 +148,11 @@ public class ActorObjectSerializer extends ObjectSerializer { */ @Override public T deserialize(byte[] content, Class clazz) throws IOException { + if (clazz == ActorTimerParams.class) { + // Special serializer for this internal classes. + return (T) deserializeActorTimer(content); + } + if (clazz == ActorReminderParams.class) { // Special serializer for this internal classes. return (T) deserializeActorReminder(content); @@ -158,49 +163,24 @@ public class ActorObjectSerializer extends ObjectSerializer { } /** - * Extracts the response data from a JSON Payload where data is in "data" attribute. + * Deserializes an Actor Timer. * - * @param payload JSON payload containing "data". - * @return byte[] instance, null. - * @throws IOException In case it cannot generate String. + * @param value Content to be deserialized. + * @return Actor Timer. + * @throws IOException If cannot parse JSON. */ - public byte[] unwrapData(final byte[] payload) throws IOException { - if (payload == null) { + private ActorTimerParams deserializeActorTimer(byte[] value) throws IOException { + if (value == null) { return null; } - JsonNode root = OBJECT_MAPPER.readTree(payload); - if (root == null) { - return null; - } + JsonNode node = OBJECT_MAPPER.readTree(value); + String callback = node.get("callback").asText(); + Duration dueTime = extractDurationOrNull(node, "dueTime"); + Duration period = extractDurationOrNull(node, "period"); + byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null; - JsonNode dataNode = root.get("data"); - if (dataNode == null) { - return null; - } - - return dataNode.binaryValue(); - } - - /** - * Wraps data in the "data" attribute in a JSON object. - * - * @param data bytes to be wrapped into the "data" attribute in a JSON object. - * @return String to be sent to Dapr's API. - * @throws IOException If there's is any issue reading the data or wraping it - */ - public byte[] wrapData(final byte[] data) throws IOException { - try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { - JsonGenerator generator = JSON_FACTORY.createGenerator(output); - generator.writeStartObject(); - if (data != null) { - generator.writeBinaryField("data", data); - } - generator.writeEndObject(); - generator.close(); - output.flush(); - return output.toByteArray(); - } + return new ActorTimerParams(callback, data, dueTime, period); } /** @@ -216,11 +196,26 @@ public class ActorObjectSerializer extends ObjectSerializer { } JsonNode node = OBJECT_MAPPER.readTree(value); - Duration dueTime = DurationUtils.convertDurationFromDaprFormat(node.get("dueTime").asText()); - Duration period = DurationUtils.convertDurationFromDaprFormat(node.get("period").asText()); + Duration dueTime = extractDurationOrNull(node, "dueTime"); + Duration period = extractDurationOrNull(node, "period"); byte[] data = node.get("data") != null ? node.get("data").binaryValue() : null; return new ActorReminderParams(data, dueTime, period); } + /** + * Extracts duration or null. + * + * @param node Node that contains the attribute. + * @param name Attribute name. + * @return Parsed duration or null. + */ + private static Duration extractDurationOrNull(JsonNode node, String name) { + JsonNode valueNode = node.get(name); + if (valueNode == null) { + return null; + } + + return DurationUtils.convertDurationFromDaprFormat(valueNode.asText()); + } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index 8cab519dd..5312046c5 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -8,10 +8,14 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; import io.dapr.actors.ActorTrace; import io.dapr.client.DaprHttpBuilder; +import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import reactor.core.publisher.Mono; +import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -21,7 +25,7 @@ import java.util.Map; * Contains methods to register actor types. Registering the types allows the * runtime to create instances of the actor. */ -public class ActorRuntime { +public class ActorRuntime implements Closeable { /** * Serializer for internal Dapr objects. @@ -43,6 +47,11 @@ public class ActorRuntime { */ private static volatile ActorRuntime instance; + /** + * Channel for communication with Dapr. + */ + private final ManagedChannel channel; + /** * Configuration for the Actor runtime. */ @@ -64,16 +73,27 @@ public class ActorRuntime { * @throws IllegalStateException If cannot instantiate Runtime. */ private ActorRuntime() throws IllegalStateException { - this(new DaprHttpClient(new DaprHttpBuilder().build())); + this(buildManagedChannel()); + } + + /** + * Constructor once channel is available. This should not be called directly. + * + * @param channel GRPC managed channel to be closed (or null). + * @throws IllegalStateException If cannot instantiate Runtime. + */ + private ActorRuntime(ManagedChannel channel) throws IllegalStateException { + this(channel, buildDaprClient(channel)); } /** * Constructor with dependency injection, useful for testing. This should not be called directly. * + * @param channel GRPC managed channel to be closed (or null). * @param daprClient Client to communicate with Dapr. * @throws IllegalStateException If class has one instance already. */ - private ActorRuntime(DaprClient daprClient) throws IllegalStateException { + private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws IllegalStateException { if (instance != null) { throw new IllegalStateException("ActorRuntime should only be constructed once"); } @@ -81,6 +101,7 @@ public class ActorRuntime { this.config = new ActorRuntimeConfig(); this.actorManagers = Collections.synchronizedMap(new HashMap<>()); this.daprClient = daprClient; + this.channel = channel; } /** @@ -272,9 +293,52 @@ public class ActorRuntime { if (actorManager == null) { String errorMsg = String.format("Actor type %s is not registered with Actor runtime.", actorTypeName); ACTOR_TRACE.writeError(TRACE_TYPE, actorTypeName, "Actor type is not registered with runtime."); - throw new IllegalStateException(errorMsg); + throw new IllegalArgumentException(errorMsg); } return actorManager; } + + /** + * Build an instance of the Client based on the provided setup. + * + * @param channel GRPC managed channel (or null, if not using GRPC). + * @return an instance of the setup Client + * @throws java.lang.IllegalStateException if any required field is missing + */ + private static DaprClient buildDaprClient(ManagedChannel channel) { + if (Properties.USE_GRPC.get()) { + return new DaprGrpcClient(channel); + } + + return new DaprHttpClient(new DaprHttpBuilder().build()); + } + + /** + * Creates a GRPC managed channel (or null, if not applicable). + * + * @return GRPC managed channel or null. + */ + private static ManagedChannel buildManagedChannel() { + if (!Properties.USE_GRPC.get()) { + return null; + } + + int port = Properties.GRPC_PORT.get(); + if (port <= 0) { + throw new IllegalStateException("Invalid port."); + } + + return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port).usePlaintext().build(); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } + } } \ No newline at end of file diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java new file mode 100644 index 000000000..cd4e9d0cf --- /dev/null +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.runtime; + +/** + * Represents a state operation for actor. + * + */ +final class ActorStateOperation { + + /** + * Name of the operation. + */ + private String operationType; + + /** + * Key for the state to be persisted. + */ + private String key; + + /** + * Value of the state to be persisted. + */ + private Object value; + + /** + * Instantiates a new Actor Timer. + * + * @param operationType Type of state operation. + * @param key Key to be persisted. + * @param value Value to be persisted. + */ + ActorStateOperation(String operationType, + String key, + Object value) { + this.operationType = operationType; + this.key = key; + this.value = value; + } + + /** + * Gets the type of state operation. + * + * @return State operation. + */ + public String getOperationType() { + return operationType; + } + + /** + * Gets the key to be persisted. + * + * @return Key to be persisted. + */ + public String getKey() { + return key; + } + + /** + * Gets the value to be persisted. + * + * @return Value to be persisted. + */ + public Object getValue() { + return value; + } +} \ No newline at end of file diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimer.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimer.java deleted file mode 100644 index 1658ac281..000000000 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. - * Licensed under the MIT License. - */ - -package io.dapr.actors.runtime; - -import java.time.Duration; - -/** - * Represents the timer set on an Actor, to be called once after due time and then every period. - * - * @param State type. - */ -final class ActorTimer { - - /** - * Actor that owns this timer. - */ - private final AbstractActor owner; - - /** - * Name of this timer. - */ - private String name; - - /** - * Name of the method to be called for this timer. - */ - private String callback; - - /** - * State to be sent in the timer. - */ - private T state; - - /** - * Due time for the timer's first trigger. - */ - private Duration dueTime; - - /** - * Period at which the timer will be triggered. - */ - private Duration period; - - /** - * Instantiates a new Actor Timer. - * - * @param owner The Actor that owns this timer. The timer callback will be fired for this Actor. - * @param timerName The name of the timer. - * @param callback The name of the method to be called for this timer. - * @param state information to be used by the callback method - * @param dueTime the time when timer is first due. - * @param period the periodic time when timer will be invoked. - */ - ActorTimer(AbstractActor owner, - String timerName, - String callback, - T state, - Duration dueTime, - Duration period) { - this.owner = owner; - this.name = timerName; - this.callback = callback; - this.state = state; - this.dueTime = dueTime; - this.period = period; - } - - /** - * Gets the name of the Timer. The name is unique per actor. - * - * @return The name of the timer. - */ - public String getName() { - return this.name; - } - - /** - * Gets the name of the method for this Timer. - * - * @return The name of the method for this timer. - */ - public String getCallback() { - return this.callback; - } - - /** - * Gets the time when timer is first due. - * - * @return Time as Duration when timer is first due. - */ - public Duration getDueTime() { - return this.dueTime; - } - - /** - * Gets the periodic time when timer will be invoked. - * - * @return Periodic time as Duration when timer will be invoked. - */ - public Duration getPeriod() { - return this.period; - } - - /** - * Gets state containing information to be used by the callback method, or null. - * - * @return State containing information to be used by the callback method, or null. - */ - public T getState() { - return this.state; - } - -} \ No newline at end of file diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java index 0c0cc5e0c..e3aa884b5 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java @@ -5,55 +5,86 @@ package io.dapr.actors.runtime; +import java.time.Duration; + /** - * Parameters for Actor Timer. + * Represents the timer set on an Actor, to be called once after due time and then every period. + * */ final class ActorTimerParams { /** - * Callback function to be invoked in actor. + * Name of the method to be called for this timer. */ private String callback; /** - * Data to be passed in as part of the timer trigger. + * State to be sent in the timer. */ private byte[] data; /** - * Sets the name of the callback function. - * - * @param callback Name of the callback function. + * Due time for the timer's first trigger. */ - public void setCallback(String callback) { + private Duration dueTime; + + /** + * Period at which the timer will be triggered. + */ + private Duration period; + + /** + * Instantiates a new Actor Timer. + * + * @param callback The name of the method to be called for this timer. + * @param data The state to be used by the callback method + * @param dueTime The time when timer is first due. + * @param period The periodic time when timer will be invoked. + */ + ActorTimerParams(String callback, + byte[] data, + Duration dueTime, + Duration period) { this.callback = callback; + this.data = data; + this.dueTime = dueTime; + this.period = period; } /** - * Gets the name of the callback function. + * Gets the name of the method for this Timer. * - * @return Name of the callback function. + * @return The name of the method for this timer. */ public String getCallback() { return this.callback; } /** - * Sets the raw data for the callback function. + * Gets the time when timer is first due. * - * @param data Raw data for the callback function. + * @return Time as Duration when timer is first due. */ - public void setData(byte[] data) { - this.data = data; + public Duration getDueTime() { + return this.dueTime; } /** - * Gets the raw data for the callback function. + * Gets the periodic time when timer will be invoked. * - * @return Raw data for the callback function. + * @return Periodic time as Duration when timer will be invoked. */ - public byte[] getData() { - return data; + public Duration getPeriod() { + return this.period; } -} + /** + * Gets state containing information to be used by the callback method, or null. + * + * @return State containing information to be used by the callback method, or null. + */ + public byte[] getData() { + return this.data; + } + +} \ No newline at end of file diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java index 4e40e3c0f..5da02ca10 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java @@ -7,6 +7,8 @@ package io.dapr.actors.runtime; import reactor.core.publisher.Mono; +import java.util.List; + /** * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required. */ @@ -25,23 +27,27 @@ interface DaprClient { /** * Saves state batch to Dapr. * - * @param actorType Type of actor. - * @param actorId Actor Identifier. - * @param data State to be saved. + * @param actorType Type of actor. + * @param actorId Actor Identifier. + * @param operations State transaction operations. * @return Asynchronous void result. */ - Mono saveActorStateTransactionally(String actorType, String actorId, byte[] data); + Mono saveActorStateTransactionally(String actorType, String actorId, List operations); /** * Register a reminder. * - * @param actorType Type of actor. - * @param actorId Actor Identifier. - * @param reminderName Name of reminder to be registered. - * @param data JSON reminder data as per Dapr's spec. + * @param actorType Type of actor. + * @param actorId Actor Identifier. + * @param reminderName Name of reminder to be registered. + * @param reminderParams Parameters for the reminder. * @return Asynchronous void result. */ - Mono registerActorReminder(String actorType, String actorId, String reminderName, byte[] data); + Mono registerActorReminder( + String actorType, + String actorId, + String reminderName, + ActorReminderParams reminderParams); /** * Unregisters a reminder. @@ -54,15 +60,15 @@ interface DaprClient { Mono unregisterActorReminder(String actorType, String actorId, String reminderName); /** - * Registers a timer. + * Register a timer. * - * @param actorType Type of actor. - * @param actorId Actor Identifier. - * @param timerName Name of timer to be registered. - * @param data JSON reminder data as per Dapr's spec. + * @param actorType Type of actor. + * @param actorId Actor Identifier. + * @param timerName Name of reminder to be registered. + * @param timerParams Parameters for the timer. * @return Asynchronous void result. */ - Mono registerActorTimer(String actorType, String actorId, String timerName, byte[] data); + Mono registerActorTimer(String actorType, String actorId, String timerName, ActorTimerParams timerParams); /** * Unregisters a timer. diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java new file mode 100644 index 000000000..7f2de0579 --- /dev/null +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.runtime; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import io.dapr.config.Properties; +import io.dapr.utils.DurationUtils; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import io.grpc.ManagedChannel; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +/** + * A DaprClient over HTTP for Actor's runtime. + */ +class DaprGrpcClient implements DaprClient { + + /** + * Use to handle internal serialization. + */ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Dapr's charset. + */ + private static final Charset CHARSET = Properties.STRING_CHARSET.get(); + + /** + * The GRPC client to be used. + * + * @see io.dapr.v1.DaprGrpc.DaprFutureStub + */ + private DaprGrpc.DaprFutureStub client; + + /** + * Internal constructor. + * + * @param channel channel (client needs to close channel after use). + */ + DaprGrpcClient(ManagedChannel channel) { + this(DaprGrpc.newFutureStub(channel)); + } + + /** + * Internal constructor. + * + * @param grpcClient Dapr's GRPC client. + */ + DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) { + this.client = grpcClient; + } + + /** + * {@inheritDoc} + */ + @Override + public Mono getActorState(String actorType, String actorId, String keyName) { + return Mono.fromCallable(() -> { + DaprProtos.GetActorStateRequest req = + DaprProtos.GetActorStateRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setKey(keyName) + .build(); + + ListenableFuture futureResponse = client.getActorState(req); + return futureResponse.get(); + }).map(r -> r.getData().toByteArray()); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono saveActorStateTransactionally( + String actorType, + String actorId, + List operations) { + List grpcOps = new ArrayList<>(); + for (ActorStateOperation op : operations) { + String operationType = op.getOperationType(); + String key = op.getKey(); + Object value = op.getValue(); + DaprProtos.TransactionalActorStateOperation.Builder opBuilder = + DaprProtos.TransactionalActorStateOperation.newBuilder() + .setOperationType(operationType) + .setKey(key); + if (value != null) { + if (value instanceof String) { + opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom((String) value, CHARSET))); + } else if (value instanceof byte[]) { + try { + String base64 = OBJECT_MAPPER.writeValueAsString(value); + opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET))); + } catch (IOException e) { + return Mono.error(e); + } + } else { + return Mono.error(() -> { + throw new IllegalArgumentException("Actor state value must be String or byte[]"); + }); + } + } + + grpcOps.add(opBuilder.build()); + } + + DaprProtos.ExecuteActorStateTransactionRequest req = + DaprProtos.ExecuteActorStateTransactionRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .addAllOperations(grpcOps) + .build(); + + return Mono.fromCallable(() -> { + ListenableFuture futureResponse = client.executeActorStateTransaction(req); + return futureResponse.get(); + }).then(); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono registerActorReminder( + String actorType, + String actorId, + String reminderName, + ActorReminderParams reminderParams) { + return Mono.fromCallable(() -> { + DaprProtos.RegisterActorReminderRequest req = + DaprProtos.RegisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .setData(ByteString.copyFrom(reminderParams.getData())) + .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) + .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) + .build(); + + ListenableFuture futureResponse = client.registerActorReminder(req); + futureResponse.get(); + return null; + }); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono unregisterActorReminder(String actorType, String actorId, String reminderName) { + return Mono.fromCallable(() -> { + DaprProtos.UnregisterActorReminderRequest req = + DaprProtos.UnregisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .build(); + + ListenableFuture futureResponse = client.unregisterActorReminder(req); + futureResponse.get(); + return null; + }); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono registerActorTimer( + String actorType, + String actorId, + String timerName, + ActorTimerParams timerParams) { + return Mono.fromCallable(() -> { + DaprProtos.RegisterActorTimerRequest req = + DaprProtos.RegisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .setCallback(timerParams.getCallback()) + .setData(ByteString.copyFrom(timerParams.getData())) + .setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime())) + .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) + .build(); + + ListenableFuture futureResponse = client.registerActorTimer(req); + futureResponse.get(); + return null; + }); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono unregisterActorTimer(String actorType, String actorId, String timerName) { + return Mono.fromCallable(() -> { + DaprProtos.UnregisterActorTimerRequest req = + DaprProtos.UnregisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .build(); + + ListenableFuture futureResponse = client.unregisterActorTimer(req); + futureResponse.get(); + return null; + }); + } + +} diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprHttpClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprHttpClient.java index 3154627f3..f7abe6883 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprHttpClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprHttpClient.java @@ -5,14 +5,30 @@ package io.dapr.actors.runtime; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import io.dapr.client.DaprHttp; import reactor.core.publisher.Mono; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + /** * A DaprClient over HTTP for Actor's runtime. */ class DaprHttpClient implements DaprClient { + /** + * Internal serializer for state. + */ + private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer(); + + /** + * Shared Json Factory as per Jackson's documentation, used only for this class. + */ + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + /** * Base URL for Dapr Actor APIs. */ @@ -75,18 +91,77 @@ class DaprHttpClient implements DaprClient { * {@inheritDoc} */ @Override - public Mono saveActorStateTransactionally(String actorType, String actorId, byte[] data) { + public Mono saveActorStateTransactionally( + String actorType, + String actorId, + List operations) { + // Constructing the JSON via a stream API to avoid creating transient objects to be instantiated. + byte[] payload = null; + try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) { + JsonGenerator generator = JSON_FACTORY.createGenerator(writer); + // Start array + generator.writeStartArray(); + + for (ActorStateOperation stateOperation : operations) { + // Start operation object. + generator.writeStartObject(); + generator.writeStringField("operation", stateOperation.getOperationType()); + + // Start request object. + generator.writeObjectFieldStart("request"); + generator.writeStringField("key", stateOperation.getKey()); + + Object value = stateOperation.getValue(); + if (value != null) { + if (value instanceof String) { + // DefaultObjectSerializer is a JSON serializer, so we just pass it on. + generator.writeFieldName("value"); + generator.writeRawValue((String) value); + } else if (value instanceof byte[]) { + // Custom serializer uses byte[]. + // DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too. + generator.writeBinaryField("value", (byte[]) value); + } else { + return Mono.error(() -> { + throw new IllegalArgumentException("Actor state value must be String or byte[]"); + }); + } + } + // End request object. + generator.writeEndObject(); + + // End operation object. + generator.writeEndObject(); + } + + // End array + generator.writeEndArray(); + + generator.close(); + writer.flush(); + payload = writer.toByteArray(); + } catch (IOException e) { + return Mono.error(e); + } + String url = String.format(ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId); - return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null).then(); + return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, payload, null, null).then(); } /** * {@inheritDoc} */ @Override - public Mono registerActorReminder(String actorType, String actorId, String reminderName, byte[] data) { + public Mono registerActorReminder( + String actorType, + String actorId, + String reminderName, + ActorReminderParams reminderParams) { String url = String.format(ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName); - return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null).then(); + return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(reminderParams)) + .flatMap(data -> + this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null) + ).then(); } /** @@ -102,9 +177,16 @@ class DaprHttpClient implements DaprClient { * {@inheritDoc} */ @Override - public Mono registerActorTimer(String actorType, String actorId, String timerName, byte[] data) { - String url = String.format(ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName); - return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null).then(); + public Mono registerActorTimer( + String actorType, + String actorId, + String timerName, + ActorTimerParams timerParams) { + return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(timerParams)) + .flatMap(data -> { + String url = String.format(ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName); + return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), url, null, data, null, null); + }).then(); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java index 0cafca2f7..7fa621f6a 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java @@ -5,8 +5,6 @@ package io.dapr.actors.runtime; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.actors.ActorId; import io.dapr.config.Properties; @@ -15,9 +13,9 @@ import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.Charset; +import java.util.ArrayList; /** * State Provider to interact with Dapr runtime to handle state. @@ -34,11 +32,6 @@ class DaprStateAsyncProvider { */ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - /** - * Shared Json Factory as per Jackson's documentation, used only for this class. - */ - private static final JsonFactory JSON_FACTORY = new JsonFactory(); - /** * Dapr's client for Actor runtime. */ @@ -71,8 +64,15 @@ class DaprStateAsyncProvider { return result.flatMap(s -> { try { + if (s == null) { + return Mono.empty(); + } + T response = this.stateSerializer.deserialize(s, type); if (this.isStateSerializerDefault && (response instanceof byte[])) { + if (s.length == 0) { + return Mono.empty(); + } // Default serializer just passes through byte arrays, so we need to decode it here. response = (T) OBJECT_MAPPER.readValue(s, byte[].class); } @@ -120,71 +120,42 @@ class DaprStateAsyncProvider { return Mono.empty(); } - int count = 0; - // Constructing the JSON via a stream API to avoid creating transient objects to be instantiated. - byte[] payload = null; - try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) { - JsonGenerator generator = JSON_FACTORY.createGenerator(writer); - // Start array - generator.writeStartArray(); + ArrayList operations = new ArrayList<>(stateChanges.length); + for (ActorStateChange stateChange : stateChanges) { + if ((stateChange == null) || (stateChange.getChangeKind() == null)) { + continue; + } - for (ActorStateChange stateChange : stateChanges) { - if ((stateChange == null) || (stateChange.getChangeKind() == null)) { - continue; - } + String operationName = stateChange.getChangeKind().getDaprStateChangeOperation(); + if ((operationName == null) || (operationName.length() == 0)) { + continue; + } - String operationName = stateChange.getChangeKind().getDaprStateChangeOperation(); - if ((operationName == null) || (operationName.length() == 0)) { - continue; - } - - count++; - - // Start operation object. - generator.writeStartObject(); - generator.writeStringField("operation", operationName); - - // Start request object. - generator.writeObjectFieldStart("request"); - generator.writeStringField("key", stateChange.getStateName()); - if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) - || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { + String key = stateChange.getStateName(); + Object value = null; + if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) + || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { + try { byte[] data = this.stateSerializer.serialize(stateChange.getValue()); if (data != null) { if (this.isStateSerializerDefault && !(stateChange.getValue() instanceof byte[])) { // DefaultObjectSerializer is a JSON serializer, so we just pass it on. - generator.writeFieldName("value"); - generator.writeRawValue(new String(data, CHARSET)); + value = new String(data, CHARSET); } else { // Custom serializer uses byte[]. // DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too. - generator.writeBinaryField("value", data); + value = data; } } + } catch (IOException e) { + return Mono.error(e); } - // End request object. - generator.writeEndObject(); - - // End operation object. - generator.writeEndObject(); } - // End array - generator.writeEndArray(); - - generator.close(); - writer.flush(); - payload = writer.toByteArray(); - } catch (IOException e) { - return Mono.error(e); + operations.add(new ActorStateOperation(operationName, key, value)); } - if (count == 0) { - // No-op since there is no operation to be performed. - Mono.empty(); - } - - return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), payload); + return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), operations); } } diff --git a/sdk-actors/src/test/java/io/dapr/actors/ActorIdTest.java b/sdk-actors/src/test/java/io/dapr/actors/ActorIdTest.java index fe512ae84..c265ae7e6 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/ActorIdTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/ActorIdTest.java @@ -4,11 +4,12 @@ */ package io.dapr.actors; -import java.util.*; - import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + /** * Unit tests for ActorId. */ diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java index b6d15bef7..6600fc1e5 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java @@ -6,8 +6,9 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; -import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.actors.ActorMethod; import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -58,16 +59,16 @@ public class ActorProxyImplTest { public void invokeActorMethodWithoutDataWithReturnTypeViaReflection() throws NoSuchMethodException { final DaprClient daprClient = mock(DaprClient.class); Mono daprResponse = Mono.just( - "{\n\t\t\"propertyA\": \"valueA\",\n\t\t\"propertyB\": \"valueB\"\n\t}".getBytes()); + "{\n\t\t\"propertyA\": \"valueA\",\n\t\t\"propertyB\": \"valueB\"\n\t}".getBytes()); when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull())) - .thenReturn(daprResponse); + .thenReturn(daprResponse); final ActorProxyImpl actorProxy = new ActorProxyImpl( - "myActorType", - new ActorId("100"), - new DefaultObjectSerializer(), - daprClient); + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); MyData myData = (MyData) actorProxy.invoke(actorProxy, Actor.class.getMethod("getData"), null); Assert.assertNotNull(myData); @@ -75,6 +76,177 @@ public class ActorProxyImplTest { Assert.assertEquals("valueB", myData.getPropertyB());// propertyB=null } + @Test() + public void invokeActorMethodWithoutDataWithReturnMonoTypeViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.just( + "{\n\t\t\"propertyA\": \"valueA\",\n\t\t\"propertyB\": \"valueB\"\n\t}".getBytes()); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull())) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Mono res = (Mono) actorProxy.invoke(actorProxy, Actor.class.getMethod("getDataMono"), null); + Assert.assertNotNull(res); + MyData myData = res.block(); + Assert.assertNotNull(myData); + Assert.assertEquals("valueA", myData.getPropertyA()); + Assert.assertEquals("valueB", myData.getPropertyB());// propertyB=null + } + + @Test() + public void invokeActorMethodWithDataWithReturnTypeViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.just( + "\"OK\"".getBytes()); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes()))) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + String res = (String) actorProxy.invoke( + actorProxy, + Actor.class.getMethod("echo", String.class), + new Object[] { "hello world" } ); + + Assert.assertEquals("OK", res); + } + + @Test() + public void invokeActorMethodWithDataWithReturnMonoTypeViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.just( + "\"OK\"".getBytes()); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes()))) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Mono res = (Mono) actorProxy.invoke( + actorProxy, + Actor.class.getMethod("echoMono", String.class), + new Object[] { "hello world" } ); + + Assert.assertNotNull(res); + Assert.assertEquals("OK", res.block()); + } + + @Test() + public void invokeActorMethodWithoutDataWithoutReturnTypeViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.empty(); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull())) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Object myData = actorProxy.invoke(actorProxy, Actor.class.getMethod("doSomething"), null); + Assert.assertNull(myData); + } + + @Test() + public void invokeActorMethodWithoutDataWithoutReturnTypeMonoViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.empty(); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.isNull())) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Mono myData = (Mono)actorProxy.invoke(actorProxy, Actor.class.getMethod("doSomethingMono"), null); + Assert.assertNotNull(myData); + Assert.assertNull(myData.block()); + } + + @Test() + public void invokeActorMethodWithDataWithoutReturnTypeMonoViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.empty(); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes()))) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Mono myData = (Mono)actorProxy.invoke( + actorProxy, + Actor.class.getMethod("doSomethingMonoWithArg", String.class), + new Object[] { "hello world" }); + + Assert.assertNotNull(myData); + Assert.assertNull(myData.block()); + } + + @Test(expected = UnsupportedOperationException.class) + public void invokeActorMethodWithTooManyArgsViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Mono myData = (Mono)actorProxy.invoke( + actorProxy, + Actor.class.getMethod("tooManyArgs", String.class, String.class), + new Object[] { "hello", "world" }); + + Assert.assertNotNull(myData); + Assert.assertNull(myData.block()); + } + + @Test() + public void invokeActorMethodWithDataWithoutReturnTypeViaReflection() throws NoSuchMethodException { + final DaprClient daprClient = mock(DaprClient.class); + Mono daprResponse = Mono.empty(); + + when(daprClient.invokeActorMethod(anyString(), anyString(), anyString(), Mockito.eq("\"hello world\"".getBytes()))) + .thenReturn(daprResponse); + + final ActorProxyImpl actorProxy = new ActorProxyImpl( + "myActorType", + new ActorId("100"), + new DefaultObjectSerializer(), + daprClient); + + Object res = actorProxy.invoke( + actorProxy, + Actor.class.getMethod("process", String.class), + new Object[] { "hello world" } ); + + Assert.assertNull(res); + } + @Test() public void invokeActorMethodWithoutDataWithEmptyReturnType() { final DaprClient daprClient = mock(DaprClient.class); @@ -110,8 +282,6 @@ public class ActorProxyImplTest { Assert.fail("Not exception was throw")) .doOnError(Throwable::printStackTrace ).block(); - - } @Test() @@ -273,6 +443,24 @@ public class ActorProxyImplTest { interface Actor { MyData getData(); + + String echo(String message); + + @ActorMethod(returns = MyData.class) + Mono getDataMono(); + + @ActorMethod(returns = String.class) + Mono echoMono(String message); + + void doSomething(); + + Mono doSomethingMono(); + + void process(String something); + + Mono doSomethingMonoWithArg(String something); + + void tooManyArgs(String something, String something2); } static class MyData { diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java new file mode 100644 index 000000000..1e7f2f29f --- /dev/null +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.ByteString; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Mono; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class DaprGrpcClientTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String ACTOR_TYPE = "MyActorType"; + + private static final String ACTOR_ID = "1234567890"; + + private DaprGrpc.DaprFutureStub grpcStub; + + private DaprGrpcClient client; + + @Before + public void setup() { + grpcStub = mock(DaprGrpc.DaprFutureStub.class); + client = new DaprGrpcClient(grpcStub); + } + + @Test + public void invoke() { + String methodName = "mymethod"; + byte[] payload = "{ \"id\": 123 }".getBytes(); + byte[] response = "\"OK\"".getBytes(); + + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build()); + + when(grpcStub.invokeActor(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(methodName, argument.getMethod()); + assertArrayEquals(payload, argument.getData().toByteArray()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, payload); + assertArrayEquals(response, result.block()); + } + + @Test + public void invokeNullPayload() { + String methodName = "mymethod"; + byte[] response = "\"OK\"".getBytes(); + + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build()); + + when(grpcStub.invokeActor(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(methodName, argument.getMethod()); + assertArrayEquals(new byte[0], argument.getData().toByteArray()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, null); + assertArrayEquals(response, result.block()); + } + + @Test + public void invokeException() { + String methodName = "mymethod"; + + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.setException(new ArithmeticException()); + + when(grpcStub.invokeActor(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(methodName, argument.getMethod()); + assertArrayEquals(new byte[0], argument.getData().toByteArray()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, null); + Exception exception = assertThrows(Exception.class, () -> result.block()); + assertTrue(exception.getCause().getCause() instanceof ArithmeticException); + } + + @Test + public void invokeNotHotMono() { + String methodName = "mymethod"; + + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.setException(new ArithmeticException()); + + when(grpcStub.invokeActor(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(methodName, argument.getMethod()); + assertArrayEquals(new byte[0], argument.getData().toByteArray()); + return true; + }))).thenReturn(settableFuture); + client.invokeActorMethod(ACTOR_TYPE, ACTOR_ID, methodName, null); + // No exception thrown because Mono is ignored here. + } + +} diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java index b7159a755..f8eb56273 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java @@ -8,16 +8,16 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; import io.dapr.actors.ActorType; import io.dapr.serializer.DefaultObjectSerializer; -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; - import io.dapr.utils.TypeRef; import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; import reactor.core.publisher.Mono; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -266,16 +266,14 @@ public class ActorManagerTest { private byte[] createReminderParams(String data) throws IOException { byte[] serializedData = this.context.getObjectSerializer().serialize(data); - ActorReminderParams params = new ActorReminderParams(serializedData, Duration.ofSeconds(1), Duration.ofSeconds(1)); - return INTERNAL_SERIALIZER.serialize(params); + ActorReminderParams p = new ActorReminderParams(serializedData, Duration.ofSeconds(1), Duration.ofSeconds(1)); + return INTERNAL_SERIALIZER.serialize(p); } private byte[] createTimerParams(String callback, Object data) throws IOException { byte[] serializedData = this.context.getObjectSerializer().serialize(data); - ActorTimerParams params = new ActorTimerParams(); - params.setCallback(callback); - params.setData(serializedData); - return INTERNAL_SERIALIZER.serialize(params); + ActorTimerParams p = new ActorTimerParams(callback, serializedData, Duration.ofSeconds(1), Duration.ofSeconds(1)); + return INTERNAL_SERIALIZER.serialize(p); } private static ActorId newActorId() { diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java index 986cac54a..47b6a6989 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java @@ -7,6 +7,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; import io.dapr.actors.ActorType; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -14,6 +15,7 @@ import org.junit.Test; import reactor.core.publisher.Mono; import java.lang.reflect.Constructor; +import java.time.Duration; import java.util.Arrays; import java.util.UUID; @@ -92,7 +94,7 @@ public class ActorRuntimeTest { @BeforeClass public static void beforeAll() throws Exception { constructor = (Constructor) Arrays.stream(ActorRuntime.class.getDeclaredConstructors()) - .filter(c -> c.getParameters().length == 1) + .filter(c -> c.getParameters().length == 2) .map(c -> { c.setAccessible(true); return c; @@ -104,15 +106,56 @@ public class ActorRuntimeTest { @Before public void setup() throws Exception { this.mockDaprClient = mock(DaprClient.class); - this.runtime = constructor.newInstance(this.mockDaprClient); + this.runtime = constructor.newInstance(null, this.mockDaprClient); + } + + @Test(expected = IllegalArgumentException.class) + public void registerActorNullClass() { + this.runtime.registerActor(null); + } + + @Test(expected = IllegalArgumentException.class) + public void registerActorNullFactory() { + this.runtime.registerActor(MyActorImpl.class, null, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + } + + @Test(expected = IllegalArgumentException.class) + public void registerActorNullSerializer() { + this.runtime.registerActor(MyActorImpl.class, new DefaultActorFactory<>(), null, new DefaultObjectSerializer()); + } + + @Test(expected = IllegalArgumentException.class) + public void registerActorNullStateSerializer() { + this.runtime.registerActor(MyActorImpl.class, new DefaultActorFactory<>(), new DefaultObjectSerializer(), null); } @Test - public void registerActor() throws Exception { - this.runtime.registerActor(MyActorImpl.class); - Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME)); + public void setActorIdleTimeout() throws Exception { + this.runtime.getConfig().setActorIdleTimeout(Duration.ofSeconds(123)); + Assert.assertEquals("{\"entities\":[],\"actorIdleTimeout\":\"0h2m3s0ms\"}", + new String(this.runtime.serializeConfig())); } + @Test + public void setActorScanInterval() throws Exception { + this.runtime.getConfig().setActorScanInterval(Duration.ofSeconds(123)); + Assert.assertEquals("{\"entities\":[],\"actorScanInterval\":\"0h2m3s0ms\"}", + new String(this.runtime.serializeConfig())); + } + + @Test + public void setDrainBalancedActors() throws Exception { + this.runtime.getConfig().setDrainBalancedActors(true); + Assert.assertEquals("{\"entities\":[],\"drainBalancedActors\":true}", + new String(this.runtime.serializeConfig())); + } + + @Test + public void setDrainOngoingCallTimeout() throws Exception { + this.runtime.getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(123)); + Assert.assertEquals("{\"entities\":[],\"drainOngoingCallTimeout\":\"0h2m3s0ms\"}", + new String(this.runtime.serializeConfig())); + } @Test public void invokeActor() throws Exception { @@ -124,6 +167,14 @@ public class ActorRuntimeTest { Assert.assertEquals("Nothing to say.", message); } + @Test(expected = IllegalArgumentException.class) + public void invokeUnknownActor() { + String actorId = UUID.randomUUID().toString(); + this.runtime.registerActor(MyActorImpl.class); + + this.runtime.invoke("UnknownActor", actorId, "say", null).block(); + } + @Test public void deactivateActor() throws Exception { String actorId = UUID.randomUUID().toString(); @@ -152,7 +203,7 @@ public class ActorRuntimeTest { @Test public void lazyInvoke() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class); + this.runtime.registerActor(MyActorImpl.class, new DefaultActorFactory<>()); Mono invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java index 78f87a494..3c2355ca6 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTimerTest.java @@ -26,9 +26,7 @@ public class ActorTimerTest { .plusHours(1) .plusSeconds(3); - ActorTimer timer = new ActorTimer( - null, - "testTimer", + ActorTimerParams timer = new ActorTimerParams( "myfunction", null, dueTime, @@ -51,9 +49,7 @@ public class ActorTimerTest { .minusHours(1) .minusMinutes(3); - ActorTimer timer = new ActorTimer( - null, - "testTimer", + ActorTimerParams timer = new ActorTimerParams( "myfunction", null, dueTime, diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java new file mode 100644 index 000000000..2c01318c1 --- /dev/null +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java @@ -0,0 +1,333 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.runtime; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import io.dapr.utils.DurationUtils; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +public class DaprGrpcClientTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String ACTOR_TYPE = "MyActorType"; + + private static final String ACTOR_ID = "1234567890"; + + private DaprGrpc.DaprFutureStub grpcStub; + + private DaprGrpcClient client; + + @Before + public void setup() { + grpcStub = mock(DaprGrpc.DaprFutureStub.class); + client = new DaprGrpcClient(grpcStub); + } + + @Test + public void getActorStateException() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.setException(new ArithmeticException()); + + when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher( + ACTOR_TYPE, + ACTOR_ID, + "MyKey" + )))).thenReturn(settableFuture); + Mono result = client.getActorState(ACTOR_TYPE, ACTOR_ID, "MyKey"); + Exception exception = assertThrows(Exception.class, () -> result.block()); + assertTrue(exception.getCause().getCause() instanceof ArithmeticException); + } + + @Test + public void getActorState() { + byte[] data = "hello world".getBytes(); + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(data)).build()); + + when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher( + ACTOR_TYPE, + ACTOR_ID, + "MyKey" + )))).thenReturn(settableFuture); + Mono result = client.getActorState(ACTOR_TYPE, ACTOR_ID, "MyKey"); + assertArrayEquals(data, result.block()); + } + + @Test + public void saveActorStateTransactionallyException() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.setException(new ArithmeticException()); + + when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( + ACTOR_TYPE, + ACTOR_ID, + new ArrayList<>() + )))).thenReturn(settableFuture); + Mono result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, new ArrayList<>()); + Exception exception = assertThrows(Exception.class, () -> result.block()); + assertTrue(exception.getCause().getCause() instanceof ArithmeticException); + } + + @Test + public void saveActorStateTransactionally() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(Empty.newBuilder().build()); + + ActorStateOperation[] operations = new ActorStateOperation[] { + new ActorStateOperation("upsert", "mykey", "hello world"), + new ActorStateOperation("delete", "mykey", null), + }; + + when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( + ACTOR_TYPE, + ACTOR_ID, + Arrays.asList(operations) + )))).thenReturn(settableFuture); + Mono result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); + result.block(); + } + + @Test + public void saveActorStateTransactionallyByteArray() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(Empty.newBuilder().build()); + + ActorStateOperation[] operations = new ActorStateOperation[] { + new ActorStateOperation("upsert", "mykey", "hello world".getBytes()), + new ActorStateOperation("delete", "mykey", null), + }; + + when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( + ACTOR_TYPE, + ACTOR_ID, + Arrays.asList(operations) + )))).thenReturn(settableFuture); + Mono result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); + result.block(); + } + + @Test + public void saveActorStateTransactionallyInvalidValueType() { + ActorStateOperation[] operations = new ActorStateOperation[] { + new ActorStateOperation("upsert", "mykey", 123), + new ActorStateOperation("delete", "mykey", null), + }; + + Mono result = client.saveActorStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); + assertThrows(IllegalArgumentException.class, () -> result.block()); + } + + + @Test + public void registerActorReminder() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(Empty.newBuilder().build()); + + String reminderName = "myreminder"; + ActorReminderParams params = new ActorReminderParams( + "hello world".getBytes(), + Duration.ofSeconds(1), + Duration.ofSeconds(2) + ); + + when(grpcStub.registerActorReminder(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(reminderName, argument.getName()); + assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); + assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.registerActorReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params); + result.block(); + } + + @Test + public void unregisterActorReminder() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(Empty.newBuilder().build()); + + String reminderName = "myreminder"; + + when(grpcStub.unregisterActorReminder(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(reminderName, argument.getName()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.unregisterActorReminder(ACTOR_TYPE, ACTOR_ID, reminderName); + result.block(); + } + + @Test + public void registerActorTimer() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(Empty.newBuilder().build()); + + String timerName = "mytimer"; + String callback = "mymethod"; + ActorTimerParams params = new ActorTimerParams( + callback, + "hello world".getBytes(), + Duration.ofSeconds(1), + Duration.ofSeconds(2) + ); + + when(grpcStub.registerActorTimer(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(timerName, argument.getName()); + assertEquals(callback, argument.getCallback()); + assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); + assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.registerActorTimer(ACTOR_TYPE, ACTOR_ID, timerName, params); + result.block(); + } + + @Test + public void unregisterActorTimer() { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(Empty.newBuilder().build()); + + String timerName = "mytimer"; + + when(grpcStub.unregisterActorTimer(argThat(argument -> { + assertEquals(ACTOR_TYPE, argument.getActorType()); + assertEquals(ACTOR_ID, argument.getActorId()); + assertEquals(timerName, argument.getName()); + return true; + }))).thenReturn(settableFuture); + Mono result = client.unregisterActorTimer(ACTOR_TYPE, ACTOR_ID, timerName); + result.block(); + } + + private static Any getAny(Object value) throws IOException { + if (value instanceof byte[]) { + String base64 = OBJECT_MAPPER.writeValueAsString(value); + return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build(); + } else if (value instanceof String) { + return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build(); + } + + throw new IllegalArgumentException("Must be byte[] or String"); + } + + private static class GetActorStateRequestMatcher implements ArgumentMatcher { + + private final String actorType; + + private final String actorId; + + private final String key; + + GetActorStateRequestMatcher(String actorType, String actorId, String key) { + this.actorType = actorType; + this.actorId = actorId; + this.key = key; + } + + @Override + public boolean matches(DaprProtos.GetActorStateRequest argument) { + if (argument == null) { + return false; + } + + return actorType.equals(argument.getActorType()) + && actorId.equals(argument.getActorId()) + && key.equals(argument.getKey()); + } + } + + private static class ExecuteActorStateTransactionRequestMatcher + implements ArgumentMatcher { + + private final String actorType; + + private final String actorId; + + private final List operations; + + ExecuteActorStateTransactionRequestMatcher(String actorType, String actorId, List operations) { + this.actorType = actorType; + this.actorId = actorId; + this.operations = operations; + } + + @Override + public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) { + if (argument == null) { + return false; + } + + if (operations.size() != argument.getOperationsCount()) { + return false; + } + + if (!actorType.equals(argument.getActorType()) + || !actorId.equals(argument.getActorId())) { + return false; + } + + for(ActorStateOperation operation : operations) { + boolean found = false; + for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) { + if (operation.getKey().equals(grpcOperation.getKey()) + && operation.getOperationType().equals(grpcOperation.getOperationType()) + && nullableEquals(operation.getValue(), grpcOperation.getValue())) { + found = true; + break; + } + } + + if (!found) { + return false; + } + } + + return true; + } + + private static boolean nullableEquals(Object one, Any another) { + if (one == null) { + return another.getValue().isEmpty(); + } + + if ((one == null) ^ (another == null)) { + return false; + } + + try { + Any oneAny = getAny(one); + return oneAny.getValue().equals(another.getValue()); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + } +} diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java index b22be9a64..4cbe6e507 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprHttpClientTest.java @@ -7,15 +7,29 @@ package io.dapr.actors.runtime; import io.dapr.client.DaprHttp; import io.dapr.client.DaprHttpProxy; import io.dapr.config.Properties; +import okhttp3.MediaType; import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; import okhttp3.mock.Behavior; import okhttp3.mock.MockInterceptor; +import okhttp3.mock.RuleAnswer; +import okio.Buffer; import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Mono; +import java.io.IOException; +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.Base64; +import java.util.Collections; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.fail; public class DaprHttpClientTest { @@ -52,8 +66,8 @@ public class DaprHttpClientTest { .respond(EXPECTED_RESULT); DaprHttp daprHttp = new DaprHttpProxy(Properties.SIDECAR_IP.get(), 3000, okHttpClient); DaprHttpClient = new DaprHttpClient(daprHttp); - Mono mono = - DaprHttpClient.saveActorStateTransactionally("DemoActor", "1", "".getBytes()); + List ops = Collections.singletonList(new ActorStateOperation("UPSERT", "key", "value")); + Mono mono = DaprHttpClient.saveActorStateTransactionally("DemoActor", "1", ops); assertNull(mono.block()); } @@ -65,7 +79,11 @@ public class DaprHttpClientTest { DaprHttp daprHttp = new DaprHttpProxy(Properties.SIDECAR_IP.get(), 3000, okHttpClient); DaprHttpClient = new DaprHttpClient(daprHttp); Mono mono = - DaprHttpClient.registerActorReminder("DemoActor", "1", "reminder", "".getBytes()); + DaprHttpClient.registerActorReminder( + "DemoActor", + "1", + "reminder", + new ActorReminderParams("".getBytes(), Duration.ofSeconds(1), Duration.ofSeconds(2))); assertNull(mono.block()); } @@ -82,13 +100,42 @@ public class DaprHttpClientTest { @Test public void registerActorTimer() { + String data = "hello world"; mockInterceptor.addRule() .put("http://127.0.0.1:3000/v1.0/actors/DemoActor/1/timers/timer") - .respond(EXPECTED_RESULT); + .answer(new RuleAnswer() { + @Override + public Response.Builder respond(Request request) { + String expectedBody = "{\"dueTime\":\"0h0m5s0ms\"," + + "\"period\":\"0h0m10s0ms\"," + + "\"callback\":\"mycallback\"," + + "\"data\":\""+ Base64.getEncoder().encodeToString(data.getBytes()) +"\"}"; + String body = ""; + try { + Buffer buffer = new Buffer(); + request.body().writeTo(buffer); + body = buffer.readString(Charset.defaultCharset()); + } catch (IOException e) { + fail(); + } + assertEquals(expectedBody, body); + return new Response.Builder() + .code(200) + .body(ResponseBody.create("{}", MediaType.get("application/json"))); + } + }); DaprHttp daprHttp = new DaprHttpProxy(Properties.SIDECAR_IP.get(), 3000, okHttpClient); DaprHttpClient = new DaprHttpClient(daprHttp); Mono mono = - DaprHttpClient.registerActorTimer("DemoActor", "1", "timer", "".getBytes()); + DaprHttpClient.registerActorTimer( + "DemoActor", + "1", + "timer", + new ActorTimerParams( + "mycallback", + data.getBytes(), + Duration.ofSeconds(5), + Duration.ofSeconds(10))); assertNull(mono.block()); } @@ -102,5 +149,4 @@ public class DaprHttpClientTest { Mono mono = DaprHttpClient.unregisterActorTimer("DemoActor", "1", "timer"); assertNull(mono.block()); } - } \ No newline at end of file diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java index 2ee192e74..62d0f956d 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java @@ -5,7 +5,6 @@ package io.dapr.actors.runtime; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.actors.ActorId; import io.dapr.serializer.DaprObjectSerializer; @@ -15,7 +14,6 @@ import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; -import java.io.IOException; import java.util.Arrays; import java.util.Objects; @@ -66,7 +64,7 @@ public class DaprStateAsyncProviderTest { if (o == null || getClass() != o.getClass()) return false; Customer customer = (Customer) o; return id == customer.id && - Objects.equals(name, customer.name); + Objects.equals(name, customer.name); } @Override @@ -80,67 +78,60 @@ public class DaprStateAsyncProviderTest { public void happyCaseApply() { DaprClient daprClient = mock(DaprClient.class); when(daprClient - .saveActorStateTransactionally( - eq("MyActor"), - eq("123"), - argThat(s -> { - try { - JsonNode node = OBJECT_MAPPER.readTree(s); - if (node == null) { - return false; - } - - if (node.size() != 4) { - return false; - } - - boolean foundInsertName = false; - boolean foundUpdateZipcode = false; - boolean foundDeleteFlag = false; - boolean foundUpdateBytes = false; - for (JsonNode operation : node) { - if (operation.get("operation") == null) { - return false; - } - if (operation.get("request") == null) { + .saveActorStateTransactionally( + eq("MyActor"), + eq("123"), + argThat(operations -> { + if (operations == null) { return false; } - String opName = operation.get("operation").asText(); - String key = operation.get("request").get("key").asText(); - JsonNode valueNode = operation.get("request").get("value"); + if (operations.size() != 4) { + return false; + } - String value = (valueNode == null) ? null : valueNode.asText(); - foundInsertName |= "upsert".equals(opName) && - "name".equals(key) && - "Jon Doe".equals(value); - foundUpdateZipcode |= "upsert".equals(opName) && - "zipcode".equals(key) && - "98011".equals(value); - foundDeleteFlag |= "delete".equals(opName) && - "flag".equals(key) && - (value == null); - foundUpdateBytes |= "upsert".equals(opName) && - "bytes".equals(key) && - "AQ==".equals(value); - } + boolean foundInsertName = false; + boolean foundUpdateZipcode = false; + boolean foundDeleteFlag = false; + boolean foundUpdateBytes = false; + for (ActorStateOperation operation : operations) { + if (operation.getOperationType() == null) { + return false; + } + if (operation.getKey() == null) { + return false; + } - return foundInsertName && foundUpdateZipcode && foundDeleteFlag && foundUpdateBytes; - } catch (IOException e) { - e.printStackTrace(); - return false; - } - }))) - .thenReturn(Mono.empty()); + String opName = operation.getOperationType(); + String key = operation.getKey(); + Object value = operation.getValue(); + + foundInsertName |= "upsert".equals(opName) && + "name".equals(key) && + "\"Jon Doe\"".equals(value); + foundUpdateZipcode |= "upsert".equals(opName) && + "zipcode".equals(key) && + "98011".equals(value); + foundDeleteFlag |= "delete".equals(opName) && + "flag".equals(key) && + (value == null); + foundUpdateBytes |= "upsert".equals(opName) && + "bytes".equals(key) && + Arrays.equals(new byte[]{0x1}, (byte[]) value); + } + + return foundInsertName && foundUpdateZipcode && foundDeleteFlag && foundUpdateBytes; + }))) + .thenReturn(Mono.empty()); DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER); provider.apply("MyActor", - new ActorId("123"), - createInsertChange("name", "Jon Doe"), - createUpdateChange("zipcode", 98011), - createDeleteChange("flag"), - createUpdateChange("bytes", new byte[] {0x1})) - .block(); + new ActorId("123"), + createInsertChange("name", "Jon Doe"), + createUpdateChange("zipcode", 98011), + createDeleteChange("flag"), + createUpdateChange("bytes", new byte[]{0x1})) + .block(); verify(daprClient).saveActorStateTransactionally(eq("MyActor"), eq("123"), any()); } @@ -149,56 +140,61 @@ public class DaprStateAsyncProviderTest { public void happyCaseLoad() throws Exception { DaprClient daprClient = mock(DaprClient.class); when(daprClient - .getActorState(any(), any(), eq("name"))) - .thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe"))); + .getActorState(any(), any(), eq("name"))) + .thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe"))); when(daprClient - .getActorState(any(), any(), eq("zipcode"))) - .thenReturn(Mono.just(SERIALIZER.serialize(98021))); + .getActorState(any(), any(), eq("zipcode"))) + .thenReturn(Mono.just(SERIALIZER.serialize(98021))); when(daprClient - .getActorState(any(), any(), eq("goals"))) - .thenReturn(Mono.just(SERIALIZER.serialize(98))); + .getActorState(any(), any(), eq("goals"))) + .thenReturn(Mono.just(SERIALIZER.serialize(98))); when(daprClient - .getActorState(any(), any(), eq("balance"))) - .thenReturn(Mono.just(SERIALIZER.serialize(46.55))); + .getActorState(any(), any(), eq("balance"))) + .thenReturn(Mono.just(SERIALIZER.serialize(46.55))); when(daprClient - .getActorState(any(), any(), eq("active"))) - .thenReturn(Mono.just(SERIALIZER.serialize(true))); + .getActorState(any(), any(), eq("active"))) + .thenReturn(Mono.just(SERIALIZER.serialize(true))); when(daprClient - .getActorState(any(), any(), eq("customer"))) - .thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes())); + .getActorState(any(), any(), eq("customer"))) + .thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes())); when(daprClient - .getActorState(any(), any(), eq("anotherCustomer"))) - .thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes())); + .getActorState(any(), any(), eq("anotherCustomer"))) + .thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes())); when(daprClient - .getActorState(any(), any(), eq("nullCustomer"))) - .thenReturn(Mono.empty()); + .getActorState(any(), any(), eq("nullCustomer"))) + .thenReturn(Mono.empty()); when(daprClient .getActorState(any(), any(), eq("bytes"))) .thenReturn(Mono.just("\"QQ==\"".getBytes())); + when(daprClient + .getActorState(any(), any(), eq("emptyBytes"))) + .thenReturn(Mono.just(new byte[0])); DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER); Assert.assertEquals("Jon Doe", - provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block()); + provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block()); Assert.assertEquals(98021, - (int)provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block()); + (int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block()); Assert.assertEquals(98, - (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block()); + (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block()); Assert.assertEquals(98, - (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block()); + (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block()); Assert.assertEquals(46.55, - (double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(), - EPSILON); + (double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(), + EPSILON); Assert.assertEquals(true, - (boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block()); + (boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block()); Assert.assertEquals(new Customer().setId(1000).setName("Roxane"), - provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block()); + provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block()); Assert.assertNotEquals(new Customer().setId(1000).setName("Roxane"), - provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block()); + provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block()); Assert.assertNull( - provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block()); + provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block()); Assert.assertArrayEquals("A".getBytes(), provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block()); + Assert.assertNull( + provider.load("MyActor", new ActorId("123"), "emptyBytes", TypeRef.get(byte[].class)).block()); } @Test @@ -207,34 +203,34 @@ public class DaprStateAsyncProviderTest { // Keys that exists. when(daprClient - .getActorState(any(), any(), eq("name"))) - .thenReturn(Mono.just("Jon Doe".getBytes())); + .getActorState(any(), any(), eq("name"))) + .thenReturn(Mono.just("Jon Doe".getBytes())); when(daprClient - .getActorState(any(), any(), eq("zipcode"))) - .thenReturn(Mono.just("98021".getBytes())); + .getActorState(any(), any(), eq("zipcode"))) + .thenReturn(Mono.just("98021".getBytes())); when(daprClient - .getActorState(any(), any(), eq("goals"))) - .thenReturn(Mono.just("98".getBytes())); + .getActorState(any(), any(), eq("goals"))) + .thenReturn(Mono.just("98".getBytes())); when(daprClient - .getActorState(any(), any(), eq("balance"))) - .thenReturn(Mono.just("46.55".getBytes())); + .getActorState(any(), any(), eq("balance"))) + .thenReturn(Mono.just("46.55".getBytes())); when(daprClient - .getActorState(any(), any(), eq("active"))) - .thenReturn(Mono.just("true".getBytes())); + .getActorState(any(), any(), eq("active"))) + .thenReturn(Mono.just("true".getBytes())); when(daprClient - .getActorState(any(), any(), eq("customer"))) - .thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes())); + .getActorState(any(), any(), eq("customer"))) + .thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes())); // Keys that do not exist. when(daprClient - .getActorState(any(), any(), eq("Does not exist"))) - .thenReturn(Mono.empty()); + .getActorState(any(), any(), eq("Does not exist"))) + .thenReturn(Mono.empty()); when(daprClient - .getActorState(any(), any(), eq("NAME"))) - .thenReturn(Mono.empty()); + .getActorState(any(), any(), eq("NAME"))) + .thenReturn(Mono.empty()); when(daprClient - .getActorState(any(), any(), eq(null))) - .thenReturn(Mono.empty()); + .getActorState(any(), any(), eq(null))) + .thenReturn(Mono.empty()); DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java index 468979ba4..425f9b941 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java @@ -8,7 +8,11 @@ package io.dapr.actors.runtime; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.TypeRef; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; /** * Class used to test different serializer implementations. diff --git a/sdk-actors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdk-actors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/sdk-actors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/AppRun.java b/sdk-tests/src/test/java/io/dapr/it/AppRun.java index 77fc94059..5a275fc8a 100644 --- a/sdk-tests/src/test/java/io/dapr/it/AppRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/AppRun.java @@ -19,7 +19,7 @@ import static io.dapr.it.Retry.callWithRetry; public class AppRun implements Stoppable { private static final String APP_COMMAND = - "mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\""; + "mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D dapr.grpc.enabled=%b"; private final DaprPorts ports; @@ -30,10 +30,11 @@ public class AppRun implements Stoppable { AppRun(DaprPorts ports, String successMessage, Class serviceClass, - int maxWaitMilliseconds) { + int maxWaitMilliseconds, + boolean useGRPC) { this.command = new Command( successMessage, - buildCommand(serviceClass, ports), + buildCommand(serviceClass, ports, useGRPC), new HashMap<>() {{ put("DAPR_HTTP_PORT", ports.getHttpPort().toString()); put("DAPR_GRPC_PORT", ports.getGrpcPort().toString()); @@ -71,9 +72,10 @@ public class AppRun implements Stoppable { } } - private static String buildCommand(Class serviceClass, DaprPorts ports) { + private static String buildCommand(Class serviceClass, DaprPorts ports, boolean useGRPC) { return String.format(APP_COMMAND, serviceClass.getCanonicalName(), - ports.getAppPort() != null ? ports.getAppPort().toString() : ""); + ports.getAppPort() != null ? ports.getAppPort().toString() : "", + useGRPC); } private static void assertListeningOnPort(int port) { diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index 334a823be..3f538861b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -8,10 +8,13 @@ package io.dapr.it; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.AfterClass; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; public abstract class BaseIT { @@ -19,7 +22,9 @@ public abstract class BaseIT { private static final Map DAPR_RUN_BUILDERS = new HashMap<>(); - private static final Collection TO_BE_STOPPED = new ArrayList<>(); + private static final Queue TO_BE_STOPPED = new LinkedList<>(); + + private static final Queue TO_BE_CLOSED = new LinkedList<>(); protected static DaprRun startDaprApp( String testName, @@ -27,13 +32,23 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds); + return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, true); + } + + protected static DaprRun startDaprApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds, + boolean useGRPC) throws Exception { + return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, useGRPC); } protected static DaprRun startDaprApp( String testName, int maxWaitMilliseconds) throws Exception { - return startDaprApp(testName, "You're up and running!", null, false, true, maxWaitMilliseconds); + return startDaprApp(testName, "You're up and running!", null, false, true, maxWaitMilliseconds, true); } protected static DaprRun startDaprApp( @@ -42,12 +57,14 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, Boolean useDaprPorts, - int maxWaitMilliseconds) throws Exception { + int maxWaitMilliseconds, + boolean useGRPC) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), successMessage, - maxWaitMilliseconds).withServiceClass(serviceClass); + maxWaitMilliseconds, + useGRPC).withServiceClass(serviceClass); DaprRun run = builder.build(); TO_BE_STOPPED.add(run); DAPR_RUN_BUILDERS.put(run.getAppName(), builder); @@ -56,17 +73,28 @@ public abstract class BaseIT { return run; } + protected static ImmutablePair startSplitDaprAndApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds) throws Exception { + return startSplitDaprAndApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, true); + } + protected static ImmutablePair startSplitDaprAndApp( String testName, String successMessage, Class serviceClass, Boolean useAppPort, - int maxWaitMilliseconds) throws Exception { + int maxWaitMilliseconds, + boolean useGRPC) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, true, true), successMessage, - maxWaitMilliseconds).withServiceClass(serviceClass); + maxWaitMilliseconds, + useGRPC).withServiceClass(serviceClass); ImmutablePair runs = builder.splitBuild(); TO_BE_STOPPED.add(runs.left); TO_BE_STOPPED.add(runs.right); @@ -79,9 +107,19 @@ public abstract class BaseIT { @AfterClass public static void cleanUp() throws Exception { - for (Stoppable toBeStopped : TO_BE_STOPPED) { + while (!TO_BE_CLOSED.isEmpty()) { + Closeable toBeClosed = TO_BE_CLOSED.remove(); + toBeClosed.close(); + } + + while (!TO_BE_STOPPED.isEmpty()) { + Stoppable toBeStopped = TO_BE_STOPPED.remove(); toBeStopped.stop(); } } + protected static T deferClose(T closeable) { + TO_BE_CLOSED.add(closeable); + return closeable; + } } diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index ede75dfe2..cf0f3d219 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -23,7 +23,7 @@ public class DaprRun implements Stoppable { // the arg in -Dexec.args is the app's port private static final String DAPR_COMMAND = - " -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\""; + " -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\" -D dapr.grpc.enabled=%s"; private final DaprPorts ports; @@ -43,11 +43,12 @@ public class DaprRun implements Stoppable { DaprPorts ports, String successMessage, Class serviceClass, - int maxWaitMilliseconds) { + int maxWaitMilliseconds, + boolean useGRPC) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName()); this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports)); + new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, useGRPC)); this.listCommand = new Command( this.appName, "dapr list"); @@ -163,14 +164,14 @@ public class DaprRun implements Stoppable { return appName; } - private static String buildDaprCommand(String appName, Class serviceClass, DaprPorts ports) { + private static String buildDaprCommand(String appName, Class serviceClass, DaprPorts ports, boolean useGRPC) { StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName)) .append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "") .append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "") .append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "") .append(serviceClass == null ? "" : String.format(DAPR_COMMAND, serviceClass.getCanonicalName(), - ports.getAppPort() != null ? ports.getAppPort().toString() : "")); + ports.getAppPort() != null ? ports.getAppPort().toString() : "", useGRPC)); return stringBuilder.toString(); } @@ -199,15 +200,19 @@ public class DaprRun implements Stoppable { private Class serviceClass; + private boolean useGRPC; + Builder( String testName, Supplier portsSupplier, String successMessage, - int maxWaitMilliseconds) { + int maxWaitMilliseconds, + boolean useGRPC) { this.testName = testName; this.portsSupplier = portsSupplier; this.successMessage = successMessage; this.maxWaitMilliseconds = maxWaitMilliseconds; + this.useGRPC = useGRPC; } public Builder withServiceClass(Class serviceClass) { @@ -221,7 +226,8 @@ public class DaprRun implements Stoppable { this.portsSupplier.get(), this.successMessage, this.serviceClass, - this.maxWaitMilliseconds); + this.maxWaitMilliseconds, + this.useGRPC); } /** @@ -234,14 +240,16 @@ public class DaprRun implements Stoppable { ports, this.successMessage, this.serviceClass, - this.maxWaitMilliseconds); + this.maxWaitMilliseconds, + this.useGRPC); DaprRun daprRun = new DaprRun( this.testName, ports, DAPR_SUCCESS_MESSAGE, null, - this.maxWaitMilliseconds); + this.maxWaitMilliseconds, + this.useGRPC); return new ImmutablePair<>(appRun, daprRun); } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActivationDeactivationIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActivationDeactivationIT.java index 378681154..2641672bc 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActivationDeactivationIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActivationDeactivationIT.java @@ -38,7 +38,7 @@ public class ActivationDeactivationIT extends BaseIT { final AtomicInteger atomicInteger = new AtomicInteger(1); logger.debug("Creating proxy builder"); - ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(DemoActor.class); + ActorProxyBuilder proxyBuilder = deferClose(new ActorProxyBuilder(DemoActor.class)); logger.debug("Creating actorId"); ActorId actorId1 = new ActorId(Integer.toString(atomicInteger.getAndIncrement())); logger.debug("Building proxy"); diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java index 2d39f62ab..a715ac8fd 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java @@ -13,16 +13,38 @@ import io.dapr.it.DaprRun; import io.dapr.it.actors.services.springboot.StatefulActor; import io.dapr.it.actors.services.springboot.StatefulActorService; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collection; + import static io.dapr.it.Retry.callWithRetry; import static org.junit.Assert.*; +@RunWith(Parameterized.class) public class ActorStateIT extends BaseIT { private static Logger logger = LoggerFactory.getLogger(ActorStateIT.class); + /** + * Parameters for this test. + * Param #1: useGrpc. + * @return Collection of parameter tuples. + */ + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { { false, false }, { false, true }, { true, false }, { true, true } }); + } + + @Parameterized.Parameter(0) + public boolean useGrpc; + + @Parameterized.Parameter(1) + public boolean useGrpcInService; + @Test public void writeReadState() throws Exception { logger.debug("Starting actor runtime ..."); @@ -32,15 +54,23 @@ public class ActorStateIT extends BaseIT { StatefulActorService.SUCCESS_MESSAGE, StatefulActorService.class, true, - 60000); + 60000, + useGrpcInService); + + if (this.useGrpc) { + runtime.switchToGRPC(); + } else { + runtime.switchToHTTP(); + } String message = "This is a message to be saved and retrieved."; String name = "Jon Doe"; byte[] bytes = new byte[] { 0x1 }; - ActorId actorId = new ActorId(Long.toString(System.currentTimeMillis())); + ActorId actorId = new ActorId( + String.format("%d-%b-%b", System.currentTimeMillis(), this.useGrpc, this.useGrpcInService)); String actorType = "StatefulActorTest"; logger.debug("Building proxy ..."); - ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class); + ActorProxyBuilder proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); ActorProxy proxy = proxyBuilder.build(actorId); // Validate conditional read works. @@ -115,13 +145,22 @@ public class ActorStateIT extends BaseIT { runtime.stop(); logger.debug("Starting service ..."); - startDaprApp( + DaprRun run2 = startDaprApp( this.getClass().getSimpleName(), StatefulActorService.SUCCESS_MESSAGE, StatefulActorService.class, true, - 60000); + 60000, + useGrpcInService); + if (this.useGrpc) { + run2.switchToGRPC(); + } else { + run2.switchToHTTP(); + } + + // Need new proxy builder because the proxy builder holds the channel. + proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); ActorProxy newProxy = proxyBuilder.build(actorId); callWithRetry(() -> { diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java index 4ce759101..e0add6a54 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java @@ -47,7 +47,7 @@ public class ActorTimerRecoveryIT extends BaseIT { String actorType="MyActorTest"; logger.debug("Creating proxy builder"); - ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class); + ActorProxyBuilder proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); logger.debug("Creating actorId"); ActorId actorId = new ActorId(UUID.randomUUID().toString()); logger.debug("Building proxy"); diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java index bd035dba6..5303ce771 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java @@ -74,7 +74,7 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT { String actorType="MyActorTest"; logger.debug("Creating proxy builder"); - ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class); + ActorProxyBuilder proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); logger.debug("Creating actorId"); ActorId actorId1 = new ActorId(ACTOR_ID); logger.debug("Building proxy"); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 0e0660ce5..df2e22590 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -11,7 +11,6 @@ import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.v1.DaprGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import okhttp3.OkHttpClient; import java.io.Closeable; @@ -27,6 +26,11 @@ public class DaprClientBuilder { */ private final boolean useGrpc; + /** + * Builder for Dapr's HTTP Client. + */ + private final DaprHttpBuilder daprHttpBuilder; + /** * Serializer used for request and response objects in DaprClient. */ @@ -47,6 +51,7 @@ public class DaprClientBuilder { this.objectSerializer = new DefaultObjectSerializer(); this.stateSerializer = new DefaultObjectSerializer(); this.useGrpc = Properties.USE_GRPC.get(); + this.daprHttpBuilder = new DaprHttpBuilder(); } /** @@ -127,12 +132,6 @@ public class DaprClientBuilder { * @return DaprClient over HTTP. */ private DaprClient buildDaprClientHttp() { - int port = Properties.HTTP_PORT.get(); - if (port <= 0) { - throw new IllegalStateException("Invalid port."); - } - OkHttpClient okHttpClient = new OkHttpClient.Builder().build(); - DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); - return new DaprClientHttp(daprHttp, this.objectSerializer, this.stateSerializer); + return new DaprClientHttp(this.daprHttpBuilder.build(), this.objectSerializer, this.stateSerializer); } }