* prepare before testing

* Update tests

* fix checkstyle

---------

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
MatejNedic 2023-05-25 23:29:19 +02:00 committed by GitHub
parent e13ad365ff
commit e03cb1566b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 377 additions and 345 deletions

View File

@ -14,21 +14,24 @@ limitations under the License.
package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.DurationUtils;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* A DaprClient over HTTP for Actor's runtime.
@ -48,9 +51,9 @@ class DaprGrpcClient implements DaprClient {
/**
* The GRPC client to be used.
*
* @see io.dapr.v1.DaprGrpc.DaprFutureStub
* @see io.dapr.v1.DaprGrpc.DaprStub
*/
private DaprGrpc.DaprFutureStub client;
private DaprGrpc.DaprStub client;
/**
* Internal constructor.
@ -58,16 +61,16 @@ class DaprGrpcClient implements DaprClient {
* @param channel channel (client needs to close channel after use).
*/
DaprGrpcClient(ManagedChannel channel) {
this(DaprGrpc.newFutureStub(channel));
this(DaprGrpc.newStub(channel));
}
/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param daprStubClient Dapr's GRPC client.
*/
DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) {
this.client = grpcClient;
DaprGrpcClient(DaprGrpc.DaprStub daprStubClient) {
this.client = daprStubClient;
}
/**
@ -75,17 +78,15 @@ class DaprGrpcClient implements DaprClient {
*/
@Override
public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
return Mono.fromCallable(() -> {
DaprProtos.GetActorStateRequest req =
DaprProtos.GetActorStateRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setKey(keyName)
.build();
DaprProtos.GetActorStateRequest req =
DaprProtos.GetActorStateRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setKey(keyName)
.build();
ListenableFuture<DaprProtos.GetActorStateResponse> futureResponse = client.getActorState(req);
return futureResponse.get();
}).map(r -> r.getData().toByteArray());
return Mono.<DaprProtos.GetActorStateResponse>create(it ->
client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray());
}
/**
@ -132,10 +133,7 @@ class DaprGrpcClient implements DaprClient {
.addAllOperations(grpcOps)
.build();
return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureResponse = client.executeActorStateTransaction(req);
return futureResponse.get();
}).then();
return Mono.<Empty>create(it -> client.executeActorStateTransaction(req, createStreamObserver(it))).then();
}
/**
@ -147,21 +145,16 @@ class DaprGrpcClient implements DaprClient {
String actorId,
String reminderName,
ActorReminderParams reminderParams) {
return Mono.fromCallable(() -> {
DaprProtos.RegisterActorReminderRequest req =
DaprProtos.RegisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.setData(ByteString.copyFrom(reminderParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()))
.build();
ListenableFuture<Empty> futureResponse = client.registerActorReminder(req);
futureResponse.get();
return null;
});
DaprProtos.RegisterActorReminderRequest req =
DaprProtos.RegisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.setData(ByteString.copyFrom(reminderParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()))
.build();
return Mono.<Empty>create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then();
}
/**
@ -169,18 +162,14 @@ class DaprGrpcClient implements DaprClient {
*/
@Override
public Mono<Void> unregisterReminder(String actorType, String actorId, String reminderName) {
return Mono.fromCallable(() -> {
DaprProtos.UnregisterActorReminderRequest req =
DaprProtos.UnregisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.build();
DaprProtos.UnregisterActorReminderRequest req =
DaprProtos.UnregisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.build();
ListenableFuture<Empty> futureResponse = client.unregisterActorReminder(req);
futureResponse.get();
return null;
});
return Mono.<Empty>create(it -> client.unregisterActorReminder(req, createStreamObserver(it))).then().then();
}
/**
@ -192,22 +181,18 @@ class DaprGrpcClient implements DaprClient {
String actorId,
String timerName,
ActorTimerParams timerParams) {
return Mono.fromCallable(() -> {
DaprProtos.RegisterActorTimerRequest req =
DaprProtos.RegisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.setCallback(timerParams.getCallback())
.setData(ByteString.copyFrom(timerParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod()))
.build();
DaprProtos.RegisterActorTimerRequest req =
DaprProtos.RegisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.setCallback(timerParams.getCallback())
.setData(ByteString.copyFrom(timerParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod()))
.build();
ListenableFuture<Empty> futureResponse = client.registerActorTimer(req);
futureResponse.get();
return null;
});
return Mono.<Empty>create(it -> client.registerActorTimer(req, createStreamObserver(it))).then().then();
}
/**
@ -215,18 +200,33 @@ class DaprGrpcClient implements DaprClient {
*/
@Override
public Mono<Void> unregisterTimer(String actorType, String actorId, String timerName) {
return Mono.fromCallable(() -> {
DaprProtos.UnregisterActorTimerRequest req =
DaprProtos.UnregisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.build();
DaprProtos.UnregisterActorTimerRequest req =
DaprProtos.UnregisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.build();
ListenableFuture<Empty> futureResponse = client.unregisterActorTimer(req);
futureResponse.get();
return null;
});
return Mono.<Empty>create(it -> client.unregisterActorTimer(req, createStreamObserver(it))).then().then();
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
sink.success(value);
}
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
}
@Override
public void onCompleted() {
sink.success();
}
};
}
}

View File

@ -14,328 +14,360 @@ limitations under the License.
package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.utils.DurationUtils;
import com.google.protobuf.GeneratedMessageV3;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static io.dapr.actors.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;
public class DaprGrpcClientTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ACTOR_TYPE = "MyActorType";
private static final String ACTOR_TYPE = "MyActorType";
private static final String ACTOR_ID = "1234567890";
private static final String ACTOR_ID = "1234567890";
private DaprGrpc.DaprFutureStub grpcStub;
private static final String KEY = "MyKey";
private DaprGrpcClient client;
private static final String ACTOR_EXCEPTION = "1_exception";
@Before
public void setup() {
grpcStub = mock(DaprGrpc.DaprFutureStub.class);
client = new DaprGrpcClient(grpcStub);
}
private static final String REMINDER_NAME = "myreminder";
@Test
public void getActorStateException() {
SettableFuture<DaprProtos.GetActorStateResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
private static final String TIMER_NAME = "timerName";
when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
"MyKey"
)))).thenReturn(settableFuture);
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey");
Exception exception = assertThrows(Exception.class, () -> result.block());
assertTrue(exception.getCause().getCause() instanceof ArithmeticException);
}
private static final byte[] RESPONSE_PAYLOAD = "\"hello world\"".getBytes();
@Test
public void getActorState() {
byte[] data = "hello world".getBytes();
SettableFuture<DaprProtos.GetActorStateResponse> settableFuture = SettableFuture.create();
settableFuture.set(DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(data)).build());
private static final List<ActorStateOperation> OPERATIONS = Arrays.asList(
new ActorStateOperation("upsert", "mykey", "hello world".getBytes()),
new ActorStateOperation("delete", "mykey", null));
when(grpcStub.getActorState(argThat(new GetActorStateRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
"MyKey"
)))).thenReturn(settableFuture);
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_ID, "MyKey");
assertArrayEquals(data, result.block());
}
private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient();
@Test
public void saveActorStateTransactionallyException() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
private DaprGrpcClient client;
when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
new ArrayList<>()
)))).thenReturn(settableFuture);
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, new ArrayList<>());
Exception exception = assertThrows(Exception.class, () -> result.block());
assertTrue(exception.getCause().getCause() instanceof ArithmeticException);
}
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
@Test
public void saveActorStateTransactionally() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
@Before
public void setup() throws IOException {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
ActorStateOperation[] operations = new ActorStateOperation[] {
new ActorStateOperation("upsert", "mykey", "hello world"),
new ActorStateOperation("delete", "mykey", null),
};
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(serviceImpl).build().start());
when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
Arrays.asList(operations)
)))).thenReturn(settableFuture);
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
result.block();
}
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
@Test
public void saveActorStateTransactionallyByteArray() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
ActorStateOperation[] operations = new ActorStateOperation[] {
new ActorStateOperation("upsert", "mykey", "hello world".getBytes()),
new ActorStateOperation("delete", "mykey", null),
};
when(grpcStub.executeActorStateTransaction(argThat(new ExecuteActorStateTransactionRequestMatcher(
ACTOR_TYPE,
ACTOR_ID,
Arrays.asList(operations)
)))).thenReturn(settableFuture);
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
result.block();
}
@Test
public void saveActorStateTransactionallyInvalidValueType() {
ActorStateOperation[] operations = new ActorStateOperation[] {
new ActorStateOperation("upsert", "mykey", 123),
new ActorStateOperation("delete", "mykey", null),
};
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
assertThrows(IllegalArgumentException.class, () -> result.block());
}
@Test
public void registerActorReminder() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String reminderName = "myreminder";
ActorReminderParams params = new ActorReminderParams(
"hello world".getBytes(),
Duration.ofSeconds(1),
Duration.ofSeconds(2)
);
when(grpcStub.registerActorReminder(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(reminderName, argument.getName());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, reminderName, params);
result.block();
}
@Test
public void unregisterActorReminder() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String reminderName = "myreminder";
when(grpcStub.unregisterActorReminder(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(reminderName, argument.getName());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, reminderName);
result.block();
}
@Test
public void registerActorTimer() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String timerName = "mytimer";
String callback = "mymethod";
ActorTimerParams params = new ActorTimerParams(
callback,
"hello world".getBytes(),
Duration.ofSeconds(1),
Duration.ofSeconds(2)
);
when(grpcStub.registerActorTimer(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(timerName, argument.getName());
assertEquals(callback, argument.getCallback());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getDueTime()), argument.getDueTime());
assertEquals(DurationUtils.convertDurationToDaprFormat(params.getPeriod()), argument.getPeriod());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, timerName, params);
result.block();
}
@Test
public void unregisterActorTimer() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
settableFuture.set(Empty.newBuilder().build());
String timerName = "mytimer";
when(grpcStub.unregisterActorTimer(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(timerName, argument.getName());
return true;
}))).thenReturn(settableFuture);
Mono<Void> result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, timerName);
result.block();
}
private static Any getAny(Object value) throws IOException {
if (value instanceof byte[]) {
String base64 = OBJECT_MAPPER.writeValueAsString(value);
return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build();
} else if (value instanceof String) {
return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build();
// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
}
throw new IllegalArgumentException("Must be byte[] or String");
}
private static class GetActorStateRequestMatcher implements ArgumentMatcher<DaprProtos.GetActorStateRequest> {
private final String actorType;
private final String actorId;
private final String key;
GetActorStateRequestMatcher(String actorType, String actorId, String key) {
this.actorType = actorType;
this.actorId = actorId;
this.key = key;
@Test
public void getActorStateException() {
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY);
assertThrowsDaprException(
ExecutionException.class,
"UNKNOWN",
"UNKNOWN: ",
result::block);
}
@Override
public boolean matches(DaprProtos.GetActorStateRequest argument) {
if (argument == null) {
return false;
}
return actorType.equals(argument.getActorType())
&& actorId.equals(argument.getActorId())
&& key.equals(argument.getKey());
}
}
private static class ExecuteActorStateTransactionRequestMatcher
implements ArgumentMatcher<DaprProtos.ExecuteActorStateTransactionRequest> {
private final String actorType;
private final String actorId;
private final List<ActorStateOperation> operations;
ExecuteActorStateTransactionRequestMatcher(String actorType, String actorId, List<ActorStateOperation> operations) {
this.actorType = actorType;
this.actorId = actorId;
this.operations = operations;
@Test
public void getActorState() {
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY);
assertArrayEquals(RESPONSE_PAYLOAD, result.block());
}
@Override
public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) {
if (argument == null) {
return false;
}
@Test
public void saveActorStateTransactionallyException() {
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_EXCEPTION, OPERATIONS);
assertThrowsDaprException(
ExecutionException.class,
"UNKNOWN",
"UNKNOWN: ",
result::block);
}
@Test
public void saveActorStateTransactionally() {
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, OPERATIONS);
result.block();
}
if (operations.size() != argument.getOperationsCount()) {
return false;
}
@Test
public void saveActorStateTransactionallyByteArray() {
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, OPERATIONS);
result.block();
}
if (!actorType.equals(argument.getActorType())
|| !actorId.equals(argument.getActorId())) {
return false;
}
@Test
public void saveActorStateTransactionallyInvalidValueType() {
ActorStateOperation[] operations = new ActorStateOperation[]{
new ActorStateOperation("upsert", "mykey", 123),
new ActorStateOperation("delete", "mykey", null),
};
for(ActorStateOperation operation : operations) {
boolean found = false;
for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) {
if (operation.getKey().equals(grpcOperation.getKey())
&& operation.getOperationType().equals(grpcOperation.getOperationType())
&& nullableEquals(operation.getValue(), grpcOperation.getValue())) {
found = true;
break;
}
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
assertThrows(IllegalArgumentException.class, result::block);
}
@Test
public void registerActorReminder() {
ActorReminderParams params = new ActorReminderParams(
"hello world".getBytes(),
Duration.ofSeconds(1),
Duration.ofSeconds(2)
);
Mono<Void> result = client.registerReminder(ACTOR_TYPE, ACTOR_ID, REMINDER_NAME, params);
result.block();
}
@Test
public void unregisterActorReminder() {
Mono<Void> result = client.unregisterReminder(ACTOR_TYPE, ACTOR_ID, REMINDER_NAME);
result.block();
}
@Test
public void registerActorTimer() {
String callback = "mymethod";
ActorTimerParams params = new ActorTimerParams(
callback,
"hello world".getBytes(),
Duration.ofSeconds(1),
Duration.ofSeconds(2)
);
Mono<Void> result = client.registerTimer(ACTOR_TYPE, ACTOR_ID, TIMER_NAME, params);
result.block();
}
@Test
public void unregisterActorTimer() {
Mono<Void> result = client.unregisterTimer(ACTOR_TYPE, ACTOR_ID, TIMER_NAME);
result.block();
}
private class CustomDaprClient extends DaprGrpc.DaprImplBase {
@Override
public void getActorState(DaprProtos.GetActorStateRequest request,
StreamObserver<DaprProtos.GetActorStateResponse> responseObserver) {
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(KEY, request.getKey());
assertEquals(ACTOR_ID, request.getActorId());
switch (request.getActorId()) {
case ACTOR_ID:
populateObserver(responseObserver, DaprProtos.GetActorStateResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD))
.build());
return;
case ACTOR_EXCEPTION:
throwException(responseObserver);
return;
}
super.getActorState(request, responseObserver);
}
if (!found) {
return false;
public void executeActorStateTransaction(io.dapr.v1.DaprProtos.ExecuteActorStateTransactionRequest request,
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(ACTOR_ID, request.getActorId());
assertTrue(new OperationsMatcher(OPERATIONS).matches(request));
switch (request.getActorId()) {
case ACTOR_ID:
populateObserver(responseObserver, Empty.newBuilder().build());
return;
case ACTOR_EXCEPTION:
throwException(responseObserver);
return;
}
super.executeActorStateTransaction(request, responseObserver);
}
}
return true;
@Override
public void registerActorReminder(io.dapr.v1.DaprProtos.RegisterActorReminderRequest request,
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
assertEquals(REMINDER_NAME, request.getName());
assertEquals("0h0m1s0ms", request.getDueTime());
assertEquals("0h0m2s0ms", request.getPeriod());
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(ACTOR_ID, request.getActorId());
switch (request.getActorId()) {
case ACTOR_ID:
populateObserver(responseObserver, Empty.newBuilder().build());
return;
case ACTOR_EXCEPTION:
throwException(responseObserver);
return;
}
super.registerActorReminder(request, responseObserver);
}
public void registerActorTimer(io.dapr.v1.DaprProtos.RegisterActorTimerRequest request,
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(ACTOR_ID, request.getActorId());
assertEquals(TIMER_NAME, request.getName());
assertEquals("mymethod", request.getCallback());
assertEquals("0h0m1s0ms", request.getDueTime());
assertEquals("0h0m2s0ms", request.getPeriod());
switch (request.getActorId()) {
case ACTOR_ID:
populateObserver(responseObserver, Empty.newBuilder().build());
return;
case ACTOR_EXCEPTION:
throwException(responseObserver);
return;
}
super.registerActorTimer(request, responseObserver);
}
/**
* <pre>
* Unregister an actor timer.
* </pre>
*/
public void unregisterActorTimer(io.dapr.v1.DaprProtos.UnregisterActorTimerRequest request,
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(ACTOR_ID, request.getActorId());
assertEquals(TIMER_NAME, request.getName());
switch (request.getActorId()) {
case ACTOR_ID:
populateObserver(responseObserver, Empty.newBuilder().build());
return;
case ACTOR_EXCEPTION:
throwException(responseObserver);
return;
}
super.unregisterActorTimer(request, responseObserver);
}
public void unregisterActorReminder(io.dapr.v1.DaprProtos.UnregisterActorReminderRequest request,
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(ACTOR_ID, request.getActorId());
assertEquals(REMINDER_NAME, request.getName());
switch (request.getActorId()) {
case ACTOR_ID:
populateObserver(responseObserver, Empty.newBuilder().build());
return;
case ACTOR_EXCEPTION:
throwException(responseObserver);
return;
}
super.unregisterActorReminder(request, responseObserver);
}
private void throwException(StreamObserver<?> responseObserver) {
Throwable e = new ArithmeticException();
StatusException se = new StatusException(Status.UNKNOWN.withCause(e));
responseObserver.onError(se);
}
private <T extends GeneratedMessageV3> void populateObserver(StreamObserver<T> responseObserver, GeneratedMessageV3 generatedMessageV3) {
responseObserver.onNext((T) generatedMessageV3);
responseObserver.onCompleted();
}
}
private static boolean nullableEquals(Object one, Any another) {
if (one == null) {
return another.getValue().isEmpty();
}
private static class OperationsMatcher {
if ((one == null) ^ (another == null)) {
return false;
}
private final List<ActorStateOperation> operations;
try {
Any oneAny = getAny(one);
return oneAny.getValue().equals(another.getValue());
} catch (IOException e) {
e.printStackTrace();
return false;
}
OperationsMatcher(List<ActorStateOperation> operations) {
this.operations = operations;
}
public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) {
if (argument == null) {
return false;
}
if (operations.size() != argument.getOperationsCount()) {
return false;
}
for (ActorStateOperation operation : operations) {
boolean found = false;
for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) {
if (operation.getKey().equals(grpcOperation.getKey())
&& operation.getOperationType().equals(grpcOperation.getOperationType())
&& nullableEquals(operation.getValue(), grpcOperation.getValue())) {
found = true;
break;
}
}
if (!found) {
return false;
}
}
return true;
}
private static boolean nullableEquals(Object one, Any another) {
if (one == null) {
return another.getValue().isEmpty();
}
if ((one == null) ^ (another == null)) {
return false;
}
try {
Any oneAny = getAny(one);
return oneAny.getValue().equals(another.getValue());
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
private static Any getAny(Object value) throws IOException {
if (value instanceof byte[]) {
String base64 = OBJECT_MAPPER.writeValueAsString(value);
return Any.newBuilder().setValue(ByteString.copyFrom(base64.getBytes())).build();
} else if (value instanceof String) {
return Any.newBuilder().setValue(ByteString.copyFrom(((String)value).getBytes())).build();
}
throw new IllegalArgumentException("Must be byte[] or String");
}
}
}
}