diff --git a/sdk-actors/pom.xml b/sdk-actors/pom.xml
index 0290ab5d6..605027969 100644
--- a/sdk-actors/pom.xml
+++ b/sdk-actors/pom.xml
@@ -71,6 +71,11 @@
2.3.5.RELEASE
test
+
+ io.grpc
+ grpc-testing
+ test
+
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
index d403728af..a78af5644 100644
--- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
@@ -105,7 +105,7 @@ public class ActorClient implements AutoCloseable {
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
switch (apiProtocol) {
- case GRPC: return new DaprGrpcClient(DaprGrpc.newFutureStub(grpcManagedChannel));
+ case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case HTTP: return new DaprHttpClient(new DaprHttpBuilder().build());
default: throw new IllegalStateException("Unsupported protocol: " + apiProtocol.name());
}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
index 5fa43b9ae..0e632bd35 100644
--- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
@@ -5,12 +5,26 @@
package io.dapr.actors.client;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
+import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
+import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+import reactor.util.context.Context;
+
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
/**
* A DaprClient over GRPC for Actor.
@@ -18,19 +32,17 @@ import reactor.core.publisher.Mono;
class DaprGrpcClient implements DaprClient {
/**
- * The GRPC client to be used.
- *
- * @see DaprGrpc.DaprFutureStub
+ * The async gRPC stub.
*/
- private DaprGrpc.DaprFutureStub client;
+ private DaprGrpc.DaprStub client;
/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
*/
- DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) {
- this.client = grpcClient;
+ DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
+ this.client = intercept(grpcClient);
}
/**
@@ -38,26 +50,81 @@ class DaprGrpcClient implements DaprClient {
*/
@Override
public Mono invoke(String actorType, String actorId, String methodName, byte[] jsonPayload) {
- return Mono.fromCallable(DaprException.wrap(() -> {
- DaprProtos.InvokeActorRequest req =
- DaprProtos.InvokeActorRequest.newBuilder()
- .setActorType(actorType)
- .setActorId(actorId)
- .setMethod(methodName)
- .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
- .build();
-
- return get(client.invokeActor(req));
- })).map(r -> r.getData().toByteArray());
+ DaprProtos.InvokeActorRequest req =
+ DaprProtos.InvokeActorRequest.newBuilder()
+ .setActorType(actorType)
+ .setActorId(actorId)
+ .setMethod(methodName)
+ .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
+ .build();
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(
+ it -> intercept(context, client).invokeActor(req, it)
+ )
+ ).map(r -> r.getData().toByteArray());
}
- private static V get(ListenableFuture future) {
- try {
- return future.get();
- } catch (Exception e) {
- DaprException.wrap(e);
- }
+ /**
+ * Populates GRPC client with interceptors.
+ *
+ * @param client GRPC client for Dapr.
+ * @return Client after adding interceptors.
+ */
+ private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
+ ClientInterceptor interceptor = new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor methodDescriptor,
+ CallOptions callOptions,
+ Channel channel) {
+ ClientCall clientCall = channel.newCall(methodDescriptor, callOptions);
+ return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) {
+ @Override
+ public void start(final Listener responseListener, final Metadata metadata) {
+ String daprApiToken = Properties.API_TOKEN.get();
+ if (daprApiToken != null) {
+ metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
+ }
- return null;
+ super.start(responseListener, metadata);
+ }
+ };
+ }
+ };
+ return client.withInterceptors(interceptor);
+ }
+
+ /**
+ * Populates GRPC client with interceptors for telemetry.
+ *
+ * @param context Reactor's context.
+ * @param client GRPC client for Dapr.
+ * @return Client after adding interceptors.
+ */
+ private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
+ return GrpcWrapper.intercept(context, client);
+ }
+
+ private Mono createMono(Consumer> consumer) {
+ return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
+ }
+
+ private StreamObserver createStreamObserver(MonoSink sink) {
+ return new StreamObserver() {
+ @Override
+ public void onNext(T value) {
+ sink.success(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ sink.error(DaprException.propagate(new ExecutionException(t)));
+ }
+
+ @Override
+ public void onCompleted() {
+ sink.success();
+ }
+ };
}
}
diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java
index 78a05cd6a..635937a20 100644
--- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java
+++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java
@@ -5,114 +5,127 @@
package io.dapr.actors.client;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import reactor.core.publisher.Mono;
+import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static io.dapr.actors.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.*;
+import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.*;
public class DaprGrpcClientTest {
private static final String ACTOR_TYPE = "MyActorType";
- private static final String ACTOR_ID = "1234567890";
+ private static final String ACTOR_ID_OK = "123-Ok";
- private DaprGrpc.DaprFutureStub grpcStub;
+ private static final String ACTOR_ID_NULL_INPUT = "123-Null";
+
+ private static final String ACTOR_ID_EXCEPTION = "123-Exception";
+
+ private static final String METHOD_NAME = "myMethod";
+
+ private static final byte[] REQUEST_PAYLOAD = "{ \"id\": 123 }".getBytes();
+
+ private static final byte[] RESPONSE_PAYLOAD = "\"OK\"".getBytes();
+
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final DaprGrpc.DaprImplBase serviceImpl =
+ mock(DaprGrpc.DaprImplBase.class, delegatesTo(
+ new DaprGrpc.DaprImplBase() {
+ @Override
+ public void invokeActor(DaprProtos.InvokeActorRequest request,
+ StreamObserver responseObserver) {
+ assertEquals(ACTOR_TYPE, request.getActorType());
+ assertEquals(METHOD_NAME, request.getMethod());
+ switch (request.getActorId()) {
+ case ACTOR_ID_OK:
+ assertArrayEquals(REQUEST_PAYLOAD, request.getData().toByteArray());
+ responseObserver.onNext(
+ DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD))
+ .build());
+ responseObserver.onCompleted();
+ return;
+ case ACTOR_ID_NULL_INPUT:
+ assertArrayEquals(new byte[0], request.getData().toByteArray());
+ responseObserver.onNext(
+ DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD))
+ .build());
+ responseObserver.onCompleted();
+ return;
+
+ case ACTOR_ID_EXCEPTION:
+ Throwable e = new ArithmeticException();
+ StatusException se = new StatusException(Status.UNKNOWN.withCause(e));
+ responseObserver.onError(se);
+ return;
+ }
+ super.invokeActor(request, responseObserver);
+ }
+ }));
private DaprGrpcClient client;
@Before
- public void setup() {
- grpcStub = mock(DaprGrpc.DaprFutureStub.class);
- client = new DaprGrpcClient(grpcStub);
+ public void setup() throws IOException {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder
+ .forName(serverName).directExecutor().addService(serviceImpl).build().start());
+
+ // Create a client channel and register for automatic graceful shutdown.
+ ManagedChannel channel = grpcCleanup.register(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+ // Create a HelloWorldClient using the in-process channel;
+ client = new DaprGrpcClient(DaprGrpc.newStub(channel));
}
@Test
public void invoke() {
- String methodName = "mymethod";
- byte[] payload = "{ \"id\": 123 }".getBytes();
- byte[] response = "\"OK\"".getBytes();
-
- SettableFuture settableFuture = SettableFuture.create();
- settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build());
-
- when(grpcStub.invokeActor(argThat(argument -> {
- assertEquals(ACTOR_TYPE, argument.getActorType());
- assertEquals(ACTOR_ID, argument.getActorId());
- assertEquals(methodName, argument.getMethod());
- assertArrayEquals(payload, argument.getData().toByteArray());
- return true;
- }))).thenReturn(settableFuture);
- Mono result = client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, payload);
- assertArrayEquals(response, result.block());
+ Mono result = client.invoke(ACTOR_TYPE, ACTOR_ID_OK, METHOD_NAME, REQUEST_PAYLOAD);
+ assertArrayEquals(RESPONSE_PAYLOAD, result.block());
}
@Test
public void invokeNullPayload() {
- String methodName = "mymethod";
- byte[] response = "\"OK\"".getBytes();
-
- SettableFuture settableFuture = SettableFuture.create();
- settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build());
-
- when(grpcStub.invokeActor(argThat(argument -> {
- assertEquals(ACTOR_TYPE, argument.getActorType());
- assertEquals(ACTOR_ID, argument.getActorId());
- assertEquals(methodName, argument.getMethod());
- assertArrayEquals(new byte[0], argument.getData().toByteArray());
- return true;
- }))).thenReturn(settableFuture);
- Mono result = client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, null);
- assertArrayEquals(response, result.block());
+ Mono result = client.invoke(ACTOR_TYPE, ACTOR_ID_NULL_INPUT, METHOD_NAME, null);
+ assertArrayEquals(RESPONSE_PAYLOAD, result.block());
}
@Test
public void invokeException() {
- String methodName = "mymethod";
-
- SettableFuture settableFuture = SettableFuture.create();
- settableFuture.setException(new ArithmeticException());
-
- when(grpcStub.invokeActor(argThat(argument -> {
- assertEquals(ACTOR_TYPE, argument.getActorType());
- assertEquals(ACTOR_ID, argument.getActorId());
- assertEquals(methodName, argument.getMethod());
- assertArrayEquals(new byte[0], argument.getData().toByteArray());
- return true;
- }))).thenReturn(settableFuture);
- Mono result = client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, null);
+ Mono result = client.invoke(ACTOR_TYPE, ACTOR_ID_EXCEPTION, METHOD_NAME, null);
assertThrowsDaprException(
ExecutionException.class,
"UNKNOWN",
- "UNKNOWN: java.lang.ArithmeticException",
+ "UNKNOWN: ",
() -> result.block());
}
@Test
public void invokeNotHotMono() {
- String methodName = "mymethod";
-
- SettableFuture settableFuture = SettableFuture.create();
- settableFuture.setException(new ArithmeticException());
-
- when(grpcStub.invokeActor(argThat(argument -> {
- assertEquals(ACTOR_TYPE, argument.getActorType());
- assertEquals(ACTOR_ID, argument.getActorId());
- assertEquals(methodName, argument.getMethod());
- assertArrayEquals(new byte[0], argument.getData().toByteArray());
- return true;
- }))).thenReturn(settableFuture);
- client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, null);
+ client.invoke(ACTOR_TYPE, ACTOR_ID_EXCEPTION, METHOD_NAME, null);
// No exception thrown because Mono is ignored here.
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/TestUtils.java b/sdk-tests/src/test/java/io/dapr/it/TestUtils.java
index 4a81eeeb2..281327c73 100644
--- a/sdk-tests/src/test/java/io/dapr/it/TestUtils.java
+++ b/sdk-tests/src/test/java/io/dapr/it/TestUtils.java
@@ -33,4 +33,13 @@ public final class TestUtils {
Assertions.assertEquals(expectedErrorCode, daprException.getErrorCode());
Assertions.assertEquals(expectedErrorMessage, daprException.getMessage());
}
+
+ public static void assertThrowsDaprExceptionSubstring(
+ String expectedErrorCode,
+ String expectedErrorMessageSubstring,
+ Executable executable) {
+ DaprException daprException = Assertions.assertThrows(DaprException.class, executable);
+ Assertions.assertEquals(expectedErrorCode, daprException.getErrorCode());
+ Assertions.assertTrue(daprException.getMessage().contains(expectedErrorMessageSubstring));
+ }
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java
new file mode 100644
index 000000000..40333549a
--- /dev/null
+++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) Microsoft Corporation and Dapr Contributors.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.it.actors;
+
+import io.dapr.actors.ActorId;
+import io.dapr.actors.client.ActorProxy;
+import io.dapr.actors.client.ActorProxyBuilder;
+import io.dapr.it.BaseIT;
+import io.dapr.it.actors.app.MyActor;
+import io.dapr.it.actors.app.MyActorService;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+import static io.dapr.it.Retry.callWithRetry;
+import static io.dapr.it.TestUtils.assertThrowsDaprException;
+import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ActorExceptionIT extends BaseIT {
+
+ private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class);
+
+ @Test
+ public void exceptionTest() throws Exception {
+ // The call below will fail if service cannot start successfully.
+ startDaprApp(
+ ActorExceptionIT.class.getSimpleName(),
+ MyActorService.SUCCESS_MESSAGE,
+ MyActorService.class,
+ true,
+ 60000);
+
+ logger.debug("Creating proxy builder");
+ ActorProxyBuilder proxyBuilder =
+ new ActorProxyBuilder("MyActorTest", MyActor.class, newActorClient());
+ logger.debug("Creating actorId");
+ ActorId actorId1 = new ActorId("1");
+ logger.debug("Building proxy");
+ MyActor proxy = proxyBuilder.build(actorId1);
+
+ callWithRetry(() -> {
+ assertThrowsDaprExceptionSubstring(
+ "INTERNAL",
+ "INTERNAL: error invoke actor method: error from actor service",
+ () -> proxy.throwException());
+ }, 5000);
+
+
+
+ }
+}
diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java
index 3c4cd3cc7..b168d90a0 100644
--- a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java
+++ b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java
@@ -29,6 +29,8 @@ public interface MyActor {
String getIdentifier();
+ void throwException();
+
@ActorMethod(name = "DotNetMethodAsync")
boolean dotNetMethod();
}
\ No newline at end of file
diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java
index 3aea4c58d..312372906 100644
--- a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java
+++ b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java
@@ -199,6 +199,11 @@ public class MyActorImpl extends AbstractActor implements MyActor, Remindable