diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java index 6a95fb3f0..3fb190b28 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java @@ -14,21 +14,24 @@ limitations under the License. package io.dapr.actors.runtime; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; import io.dapr.utils.DurationUtils; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; /** * A DaprClient over HTTP for Actor's runtime. @@ -48,9 +51,9 @@ class DaprGrpcClient implements DaprClient { /** * The GRPC client to be used. * - * @see io.dapr.v1.DaprGrpc.DaprFutureStub + * @see io.dapr.v1.DaprGrpc.DaprStub */ - private DaprGrpc.DaprFutureStub client; + private DaprGrpc.DaprStub client; /** * Internal constructor. @@ -58,16 +61,16 @@ class DaprGrpcClient implements DaprClient { * @param channel channel (client needs to close channel after use). */ DaprGrpcClient(ManagedChannel channel) { - this(DaprGrpc.newFutureStub(channel)); + this(DaprGrpc.newStub(channel)); } /** * Internal constructor. * - * @param grpcClient Dapr's GRPC client. + * @param daprStubClient Dapr's GRPC client. */ - DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) { - this.client = grpcClient; + DaprGrpcClient(DaprGrpc.DaprStub daprStubClient) { + this.client = daprStubClient; } /** @@ -75,17 +78,15 @@ class DaprGrpcClient implements DaprClient { */ @Override public Mono getState(String actorType, String actorId, String keyName) { - return Mono.fromCallable(() -> { - DaprProtos.GetActorStateRequest req = - DaprProtos.GetActorStateRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setKey(keyName) - .build(); + DaprProtos.GetActorStateRequest req = + DaprProtos.GetActorStateRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setKey(keyName) + .build(); - ListenableFuture futureResponse = client.getActorState(req); - return futureResponse.get(); - }).map(r -> r.getData().toByteArray()); + return Mono.create(it -> + client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray()); } /** @@ -132,10 +133,7 @@ class DaprGrpcClient implements DaprClient { .addAllOperations(grpcOps) .build(); - return Mono.fromCallable(() -> { - ListenableFuture futureResponse = client.executeActorStateTransaction(req); - return futureResponse.get(); - }).then(); + return Mono.create(it -> client.executeActorStateTransaction(req, createStreamObserver(it))).then(); } /** @@ -147,21 +145,16 @@ class DaprGrpcClient implements DaprClient { String actorId, String reminderName, ActorReminderParams reminderParams) { - return Mono.fromCallable(() -> { - DaprProtos.RegisterActorReminderRequest req = - DaprProtos.RegisterActorReminderRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(reminderName) - .setData(ByteString.copyFrom(reminderParams.getData())) - .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) - .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) - .build(); - - ListenableFuture futureResponse = client.registerActorReminder(req); - futureResponse.get(); - return null; - }); + DaprProtos.RegisterActorReminderRequest req = + DaprProtos.RegisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .setData(ByteString.copyFrom(reminderParams.getData())) + .setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime())) + .setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod())) + .build(); + return Mono.create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then(); } /** @@ -169,18 +162,14 @@ class DaprGrpcClient implements DaprClient { */ @Override public Mono unregisterReminder(String actorType, String actorId, String reminderName) { - return Mono.fromCallable(() -> { - DaprProtos.UnregisterActorReminderRequest req = - DaprProtos.UnregisterActorReminderRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(reminderName) - .build(); + DaprProtos.UnregisterActorReminderRequest req = + DaprProtos.UnregisterActorReminderRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(reminderName) + .build(); - ListenableFuture futureResponse = client.unregisterActorReminder(req); - futureResponse.get(); - return null; - }); + return Mono.create(it -> client.unregisterActorReminder(req, createStreamObserver(it))).then().then(); } /** @@ -192,22 +181,18 @@ class DaprGrpcClient implements DaprClient { String actorId, String timerName, ActorTimerParams timerParams) { - return Mono.fromCallable(() -> { - DaprProtos.RegisterActorTimerRequest req = - DaprProtos.RegisterActorTimerRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(timerName) - .setCallback(timerParams.getCallback()) - .setData(ByteString.copyFrom(timerParams.getData())) - .setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime())) - .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) - .build(); + DaprProtos.RegisterActorTimerRequest req = + DaprProtos.RegisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .setCallback(timerParams.getCallback()) + .setData(ByteString.copyFrom(timerParams.getData())) + .setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime())) + .setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod())) + .build(); - ListenableFuture futureResponse = client.registerActorTimer(req); - futureResponse.get(); - return null; - }); + return Mono.create(it -> client.registerActorTimer(req, createStreamObserver(it))).then().then(); } /** @@ -215,18 +200,33 @@ class DaprGrpcClient implements DaprClient { */ @Override public Mono unregisterTimer(String actorType, String actorId, String timerName) { - return Mono.fromCallable(() -> { - DaprProtos.UnregisterActorTimerRequest req = - DaprProtos.UnregisterActorTimerRequest.newBuilder() - .setActorType(actorType) - .setActorId(actorId) - .setName(timerName) - .build(); + DaprProtos.UnregisterActorTimerRequest req = + DaprProtos.UnregisterActorTimerRequest.newBuilder() + .setActorType(actorType) + .setActorId(actorId) + .setName(timerName) + .build(); - ListenableFuture futureResponse = client.unregisterActorTimer(req); - futureResponse.get(); - return null; - }); + return Mono.create(it -> client.unregisterActorTimer(req, createStreamObserver(it))).then().then(); + } + + private StreamObserver createStreamObserver(MonoSink sink) { + return new StreamObserver() { + @Override + public void onNext(T value) { + sink.success(value); + } + + @Override + public void onError(Throwable t) { + sink.error(DaprException.propagate(new ExecutionException(t))); + } + + @Override + public void onCompleted() { + sink.success(); + } + }; } } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java index 6b362b984..5d67c610a 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java @@ -14,328 +14,360 @@ limitations under the License. package io.dapr.actors.runtime; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; -import io.dapr.utils.DurationUtils; +import com.google.protobuf.GeneratedMessageV3; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.mockito.ArgumentMatcher; import reactor.core.publisher.Mono; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; +import static io.dapr.actors.TestUtils.assertThrowsDaprException; import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.*; public class DaprGrpcClientTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String ACTOR_TYPE = "MyActorType"; + private static final String ACTOR_TYPE = "MyActorType"; - private static final String ACTOR_ID = "1234567890"; + private static final String ACTOR_ID = "1234567890"; - private DaprGrpc.DaprFutureStub grpcStub; + private static final String KEY = "MyKey"; - private DaprGrpcClient client; + private static final String ACTOR_EXCEPTION = "1_exception"; - @Before - public void setup() { - grpcStub = mock(DaprGrpc.DaprFutureStub.class); - client = new DaprGrpcClient(grpcStub); - } + private static final String REMINDER_NAME = "myreminder"; - @Test - public void getActorStateException() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.setException(new ArithmeticException()); + private static final String TIMER_NAME = "timerName"; - when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - "MyKey" - )))).thenReturn(settableFuture); - Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey"); - Exception exception = assertThrows(Exception.class, () -> result.block()); - assertTrue(exception.getCause().getCause() instanceof ArithmeticException); - } + private static final byte[] RESPONSE_PAYLOAD = "\"hello world\"".getBytes(); - @Test - public void getActorState() { - byte[] data = "hello world".getBytes(); - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(data)).build()); + private static final List OPERATIONS = Arrays.asList( + new ActorStateOperation("upsert", "mykey", "hello world".getBytes()), + new ActorStateOperation("delete", "mykey", null)); - when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - "MyKey" - )))).thenReturn(settableFuture); - Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey"); - assertArrayEquals(data, result.block()); - } + private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient(); - @Test - public void saveActorStateTransactionallyException() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.setException(new ArithmeticException()); + private DaprGrpcClient client; - when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - new ArrayList<>() - )))).thenReturn(settableFuture); - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, new ArrayList<>()); - Exception exception = assertThrows(Exception.class, () -> result.block()); - assertTrue(exception.getCause().getCause() instanceof ArithmeticException); - } + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - @Test - public void saveActorStateTransactionally() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); + @Before + public void setup() throws IOException { + // Generate a unique in-process server name. + String serverName = InProcessServerBuilder.generateName(); - ActorStateOperation[] operations = new ActorStateOperation[] { - new ActorStateOperation("upsert", "mykey", "hello world"), - new ActorStateOperation("delete", "mykey", null), - }; + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(serviceImpl).build().start()); - when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - Arrays.asList(operations) - )))).thenReturn(settableFuture); - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); - result.block(); - } + // Create a client channel and register for automatic graceful shutdown. + ManagedChannel channel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); - @Test - public void saveActorStateTransactionallyByteArray() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - ActorStateOperation[] operations = new ActorStateOperation[] { - new ActorStateOperation("upsert", "mykey", "hello world".getBytes()), - new ActorStateOperation("delete", "mykey", null), - }; - - when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher( - ACTOR_TYPE, - ACTOR_ID, - Arrays.asList(operations) - )))).thenReturn(settableFuture); - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); - result.block(); - } - - @Test - public void saveActorStateTransactionallyInvalidValueType() { - ActorStateOperation[] operations = new ActorStateOperation[] { - new ActorStateOperation("upsert", "mykey", 123), - new ActorStateOperation("delete", "mykey", null), - }; - - Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); - assertThrows(IllegalArgumentException.class, () -> result.block()); - } - - - @Test - public void registerActorReminder() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String reminderName = "myreminder"; - ActorReminderParams params = new ActorReminderParams( - "hello world".getBytes(), - Duration.ofSeconds(1), - Duration.ofSeconds(2) - ); - - when(grpcStub.registerActorReminder(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(reminderName, argument.getName()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); - return true; - }))).thenReturn(settableFuture); - Mono result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params); - result.block(); - } - - @Test - public void unregisterActorReminder() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String reminderName = "myreminder"; - - when(grpcStub.unregisterActorReminder(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(reminderName, argument.getName()); - return true; - }))).thenReturn(settableFuture); - Mono result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, reminderName); - result.block(); - } - - @Test - public void registerActorTimer() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String timerName = "mytimer"; - String callback = "mymethod"; - ActorTimerParams params = new ActorTimerParams( - callback, - "hello world".getBytes(), - Duration.ofSeconds(1), - Duration.ofSeconds(2) - ); - - when(grpcStub.registerActorTimer(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(timerName, argument.getName()); - assertEquals(callback, argument.getCallback()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime()); - assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod()); - return true; - }))).thenReturn(settableFuture); - Mono result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, timerName, params); - result.block(); - } - - @Test - public void unregisterActorTimer() { - SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(Empty.newBuilder().build()); - - String timerName = "mytimer"; - - when(grpcStub.unregisterActorTimer(argThat(argument -> { - assertEquals(ACTOR_TYPE, argument.getActorType()); - assertEquals(ACTOR_ID, argument.getActorId()); - assertEquals(timerName, argument.getName()); - return true; - }))).thenReturn(settableFuture); - Mono result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, timerName); - result.block(); - } - - private static Any getAny(Object value) throws IOException { - if (value instanceof byte[]) { - String base64 = OBJECT_MAPPER.writeValueAsString(value); - return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build(); - } else if (value instanceof String) { - return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build(); + // Create a HelloWorldClient using the in-process channel; + client = new DaprGrpcClient(DaprGrpc.newStub(channel)); } - throw new IllegalArgumentException("Must be byte[] or String"); - } - - private static class GetActorStateRequestMatcher implements ArgumentMatcher { - - private final String actorType; - - private final String actorId; - - private final String key; - - GetActorStateRequestMatcher(String actorType, String actorId, String key) { - this.actorType = actorType; - this.actorId = actorId; - this.key = key; + @Test + public void getActorStateException() { + Mono result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY); + assertThrowsDaprException( + ExecutionException.class, + "UNKNOWN", + "UNKNOWN: ", + result::block); } - @Override - public boolean matches(DaprProtos.GetActorStateRequest argument) { - if (argument == null) { - return false; - } - - return actorType.equals(argument.getActorType()) - && actorId.equals(argument.getActorId()) - && key.equals(argument.getKey()); - } - } - - private static class ExecuteActorStateTransactionRequestMatcher - implements ArgumentMatcher { - - private final String actorType; - - private final String actorId; - - private final List operations; - - ExecuteActorStateTransactionRequestMatcher(String actorType, String actorId, List operations) { - this.actorType = actorType; - this.actorId = actorId; - this.operations = operations; + @Test + public void getActorState() { + Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY); + assertArrayEquals(RESPONSE_PAYLOAD, result.block()); } - @Override - public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) { - if (argument == null) { - return false; - } + @Test + public void saveActorStateTransactionallyException() { + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_EXCEPTION, OPERATIONS); + assertThrowsDaprException( + ExecutionException.class, + "UNKNOWN", + "UNKNOWN: ", + result::block); + } + @Test + public void saveActorStateTransactionally() { + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, OPERATIONS); + result.block(); + } - if (operations.size() != argument.getOperationsCount()) { - return false; - } + @Test + public void saveActorStateTransactionallyByteArray() { + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, OPERATIONS); + result.block(); + } - if (!actorType.equals(argument.getActorType()) - || !actorId.equals(argument.getActorId())) { - return false; - } + @Test + public void saveActorStateTransactionallyInvalidValueType() { + ActorStateOperation[] operations = new ActorStateOperation[]{ + new ActorStateOperation("upsert", "mykey", 123), + new ActorStateOperation("delete", "mykey", null), + }; - for(ActorStateOperation operation : operations) { - boolean found = false; - for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) { - if (operation.getKey().equals(grpcOperation.getKey()) - && operation.getOperationType().equals(grpcOperation.getOperationType()) - && nullableEquals(operation.getValue(), grpcOperation.getValue())) { - found = true; - break; - } + Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); + assertThrows(IllegalArgumentException.class, result::block); + } + + + @Test + public void registerActorReminder() { + ActorReminderParams params = new ActorReminderParams( + "hello world".getBytes(), + Duration.ofSeconds(1), + Duration.ofSeconds(2) + ); + Mono result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, REMINDER_NAME, params); + result.block(); + } + + @Test + public void unregisterActorReminder() { + + Mono result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, REMINDER_NAME); + result.block(); + } + + @Test + public void registerActorTimer() { + String callback = "mymethod"; + ActorTimerParams params = new ActorTimerParams( + callback, + "hello world".getBytes(), + Duration.ofSeconds(1), + Duration.ofSeconds(2) + ); + + Mono result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, TIMER_NAME, params); + result.block(); + } + + @Test + public void unregisterActorTimer() { + Mono result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, TIMER_NAME); + result.block(); + } + + + private class CustomDaprClient extends DaprGrpc.DaprImplBase { + + @Override + public void getActorState(DaprProtos.GetActorStateRequest request, + StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(KEY, request.getKey()); + assertEquals(ACTOR_ID, request.getActorId()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD)) + .build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.getActorState(request, responseObserver); } - if (!found) { - return false; + public void executeActorStateTransaction(io.dapr.v1.DaprProtos.ExecuteActorStateTransactionRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertTrue(new OperationsMatcher(OPERATIONS).matches(request)); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.executeActorStateTransaction(request, responseObserver); } - } - return true; + @Override + public void registerActorReminder(io.dapr.v1.DaprProtos.RegisterActorReminderRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(REMINDER_NAME, request.getName()); + assertEquals("0h0m1s0ms", request.getDueTime()); + assertEquals("0h0m2s0ms", request.getPeriod()); + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.registerActorReminder(request, responseObserver); + } + + public void registerActorTimer(io.dapr.v1.DaprProtos.RegisterActorTimerRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertEquals(TIMER_NAME, request.getName()); + assertEquals("mymethod", request.getCallback()); + assertEquals("0h0m1s0ms", request.getDueTime()); + assertEquals("0h0m2s0ms", request.getPeriod()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.registerActorTimer(request, responseObserver); + } + + /** + *
+         * Unregister an actor timer.
+         * 
+ */ + public void unregisterActorTimer(io.dapr.v1.DaprProtos.UnregisterActorTimerRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertEquals(TIMER_NAME, request.getName()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.unregisterActorTimer(request, responseObserver); + } + + public void unregisterActorReminder(io.dapr.v1.DaprProtos.UnregisterActorReminderRequest request, + io.grpc.stub.StreamObserver responseObserver) { + assertEquals(ACTOR_TYPE, request.getActorType()); + assertEquals(ACTOR_ID, request.getActorId()); + assertEquals(REMINDER_NAME, request.getName()); + switch (request.getActorId()) { + case ACTOR_ID: + populateObserver(responseObserver, Empty.newBuilder().build()); + return; + + case ACTOR_EXCEPTION: + throwException(responseObserver); + return; + } + super.unregisterActorReminder(request, responseObserver); + } + + private void throwException(StreamObserver responseObserver) { + Throwable e = new ArithmeticException(); + StatusException se = new StatusException(Status.UNKNOWN.withCause(e)); + responseObserver.onError(se); + } + + private void populateObserver(StreamObserver responseObserver, GeneratedMessageV3 generatedMessageV3) { + responseObserver.onNext((T) generatedMessageV3); + responseObserver.onCompleted(); + } } - private static boolean nullableEquals(Object one, Any another) { - if (one == null) { - return another.getValue().isEmpty(); - } + private static class OperationsMatcher { - if ((one == null) ^ (another == null)) { - return false; - } + private final List operations; - try { - Any oneAny = getAny(one); - return oneAny.getValue().equals(another.getValue()); - } catch (IOException e) { - e.printStackTrace(); - return false; - } + OperationsMatcher(List operations) { + this.operations = operations; + } + + public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) { + if (argument == null) { + return false; + } + + if (operations.size() != argument.getOperationsCount()) { + return false; + } + + for (ActorStateOperation operation : operations) { + boolean found = false; + for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) { + if (operation.getKey().equals(grpcOperation.getKey()) + && operation.getOperationType().equals(grpcOperation.getOperationType()) + && nullableEquals(operation.getValue(), grpcOperation.getValue())) { + found = true; + break; + } + } + + if (!found) { + return false; + } + } + + return true; + } + + private static boolean nullableEquals(Object one, Any another) { + if (one == null) { + return another.getValue().isEmpty(); + } + + if ((one == null) ^ (another == null)) { + return false; + } + + try { + Any oneAny = getAny(one); + return oneAny.getValue().equals(another.getValue()); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + private static Any getAny(Object value) throws IOException { + if (value instanceof byte[]) { + String base64 = OBJECT_MAPPER.writeValueAsString(value); + return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build(); + } else if (value instanceof String) { + return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build(); + } + + throw new IllegalArgumentException("Must be byte[] or String"); + } } - } }