Change Actor gRPC client to async. (#564)

This commit is contained in:
Artur Souza 2021-06-14 13:37:31 -07:00 committed by GitHub
parent d056d9a780
commit 5c05f3d9c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 250 additions and 91 deletions

View File

@ -71,6 +71,11 @@
<version>2.3.5.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -105,7 +105,7 @@ public class ActorClient implements AutoCloseable {
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
switch (apiProtocol) {
case GRPC: return new DaprGrpcClient(DaprGrpc.newFutureStub(grpcManagedChannel));
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case HTTP: return new DaprHttpClient(new DaprHttpBuilder().build());
default: throw new IllegalStateException("Unsupported protocol: " + apiProtocol.name());
}

View File

@ -5,12 +5,26 @@
package io.dapr.actors.client;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
/**
* A DaprClient over GRPC for Actor.
@ -18,19 +32,17 @@ import reactor.core.publisher.Mono;
class DaprGrpcClient implements DaprClient {
/**
* The GRPC client to be used.
*
* @see DaprGrpc.DaprFutureStub
* The async gRPC stub.
*/
private DaprGrpc.DaprFutureStub client;
private DaprGrpc.DaprStub client;
/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
*/
DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) {
this.client = grpcClient;
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
this.client = intercept(grpcClient);
}
/**
@ -38,7 +50,6 @@ class DaprGrpcClient implements DaprClient {
*/
@Override
public Mono<byte[]> invoke(String actorType, String actorId, String methodName, byte[] jsonPayload) {
return Mono.fromCallable(DaprException.wrap(() -> {
DaprProtos.InvokeActorRequest req =
DaprProtos.InvokeActorRequest.newBuilder()
.setActorType(actorType)
@ -46,18 +57,74 @@ class DaprGrpcClient implements DaprClient {
.setMethod(methodName)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return get(client.invokeActor(req));
})).map(r -> r.getData().toByteArray());
return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.InvokeActorResponse>createMono(
it -> intercept(context, client).invokeActor(req, it)
)
).map(r -> r.getData().toByteArray());
}
private static <V> V get(ListenableFuture<V> future) {
try {
return future.get();
} catch (Exception e) {
DaprException.wrap(e);
/**
* Populates GRPC client with interceptors.
*
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
}
return null;
super.start(responseListener, metadata);
}
};
}
};
return client.withInterceptors(interceptor);
}
/**
* Populates GRPC client with interceptors for telemetry.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
sink.success(value);
}
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
}
@Override
public void onCompleted() {
sink.success();
}
};
}
}

View File

@ -5,114 +5,127 @@
package io.dapr.actors.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static io.dapr.actors.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.*;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.*;
public class DaprGrpcClientTest {
private static final String ACTOR_TYPE = "MyActorType";
private static final String ACTOR_ID = "1234567890";
private static final String ACTOR_ID_OK = "123-Ok";
private DaprGrpc.DaprFutureStub grpcStub;
private static final String ACTOR_ID_NULL_INPUT = "123-Null";
private static final String ACTOR_ID_EXCEPTION = "123-Exception";
private static final String METHOD_NAME = "myMethod";
private static final byte[] REQUEST_PAYLOAD = "{ \"id\": 123 }".getBytes();
private static final byte[] RESPONSE_PAYLOAD = "\"OK\"".getBytes();
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
private final DaprGrpc.DaprImplBase serviceImpl =
mock(DaprGrpc.DaprImplBase.class, delegatesTo(
new DaprGrpc.DaprImplBase() {
@Override
public void invokeActor(DaprProtos.InvokeActorRequest request,
StreamObserver<DaprProtos.InvokeActorResponse> responseObserver) {
assertEquals(ACTOR_TYPE, request.getActorType());
assertEquals(METHOD_NAME, request.getMethod());
switch (request.getActorId()) {
case ACTOR_ID_OK:
assertArrayEquals(REQUEST_PAYLOAD, request.getData().toByteArray());
responseObserver.onNext(
DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD))
.build());
responseObserver.onCompleted();
return;
case ACTOR_ID_NULL_INPUT:
assertArrayEquals(new byte[0], request.getData().toByteArray());
responseObserver.onNext(
DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(RESPONSE_PAYLOAD))
.build());
responseObserver.onCompleted();
return;
case ACTOR_ID_EXCEPTION:
Throwable e = new ArithmeticException();
StatusException se = new StatusException(Status.UNKNOWN.withCause(e));
responseObserver.onError(se);
return;
}
super.invokeActor(request, responseObserver);
}
}));
private DaprGrpcClient client;
@Before
public void setup() {
grpcStub = mock(DaprGrpc.DaprFutureStub.class);
client = new DaprGrpcClient(grpcStub);
public void setup() throws IOException {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(serviceImpl).build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
}
@Test
public void invoke() {
String methodName = "mymethod";
byte[] payload = "{ \"id\": 123 }".getBytes();
byte[] response = "\"OK\"".getBytes();
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(payload, argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
Mono<byte[]> result = client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, payload);
assertArrayEquals(response, result.block());
Mono<byte[]> result = client.invoke(ACTOR_TYPE, ACTOR_ID_OK, METHOD_NAME, REQUEST_PAYLOAD);
assertArrayEquals(RESPONSE_PAYLOAD, result.block());
}
@Test
public void invokeNullPayload() {
String methodName = "mymethod";
byte[] response = "\"OK\"".getBytes();
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.set(DaprProtos.InvokeActorResponse.newBuilder().setData(ByteString.copyFrom(response)).build());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(new byte[0], argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
Mono<byte[]> result = client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, null);
assertArrayEquals(response, result.block());
Mono<byte[]> result = client.invoke(ACTOR_TYPE, ACTOR_ID_NULL_INPUT, METHOD_NAME, null);
assertArrayEquals(RESPONSE_PAYLOAD, result.block());
}
@Test
public void invokeException() {
String methodName = "mymethod";
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(new byte[0], argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
Mono<byte[]> result = client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, null);
Mono<byte[]> result = client.invoke(ACTOR_TYPE, ACTOR_ID_EXCEPTION, METHOD_NAME, null);
assertThrowsDaprException(
ExecutionException.class,
"UNKNOWN",
"UNKNOWN: java.lang.ArithmeticException",
"UNKNOWN: ",
() -> result.block());
}
@Test
public void invokeNotHotMono() {
String methodName = "mymethod";
SettableFuture<DaprProtos.InvokeActorResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new ArithmeticException());
when(grpcStub.invokeActor(argThat(argument -> {
assertEquals(ACTOR_TYPE, argument.getActorType());
assertEquals(ACTOR_ID, argument.getActorId());
assertEquals(methodName, argument.getMethod());
assertArrayEquals(new byte[0], argument.getData().toByteArray());
return true;
}))).thenReturn(settableFuture);
client.invoke(ACTOR_TYPE, ACTOR_ID, methodName, null);
client.invoke(ACTOR_TYPE, ACTOR_ID_EXCEPTION, METHOD_NAME, null);
// No exception thrown because Mono is ignored here.
}

View File

@ -33,4 +33,13 @@ public final class TestUtils {
Assertions.assertEquals(expectedErrorCode, daprException.getErrorCode());
Assertions.assertEquals(expectedErrorMessage, daprException.getMessage());
}
public static <T extends Throwable> void assertThrowsDaprExceptionSubstring(
String expectedErrorCode,
String expectedErrorMessageSubstring,
Executable executable) {
DaprException daprException = Assertions.assertThrows(DaprException.class, executable);
Assertions.assertEquals(expectedErrorCode, daprException.getErrorCode());
Assertions.assertTrue(daprException.getMessage().contains(expectedErrorMessageSubstring));
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.actors;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.actors.app.MyActor;
import io.dapr.it.actors.app.MyActorService;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprException;
import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ActorExceptionIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class);
@Test
public void exceptionTest() throws Exception {
// The call below will fail if service cannot start successfully.
startDaprApp(
ActorExceptionIT.class.getSimpleName(),
MyActorService.SUCCESS_MESSAGE,
MyActorService.class,
true,
60000);
logger.debug("Creating proxy builder");
ActorProxyBuilder<MyActor> proxyBuilder =
new ActorProxyBuilder("MyActorTest", MyActor.class, newActorClient());
logger.debug("Creating actorId");
ActorId actorId1 = new ActorId("1");
logger.debug("Building proxy");
MyActor proxy = proxyBuilder.build(actorId1);
callWithRetry(() -> {
assertThrowsDaprExceptionSubstring(
"INTERNAL",
"INTERNAL: error invoke actor method: error from actor service",
() -> proxy.throwException());
}, 5000);
}
}

View File

@ -29,6 +29,8 @@ public interface MyActor {
String getIdentifier();
void throwException();
@ActorMethod(name = "DotNetMethodAsync")
boolean dotNetMethod();
}

View File

@ -199,6 +199,11 @@ public class MyActorImpl extends AbstractActor implements MyActor, Remindable<St
return System.getenv("DAPR_HTTP_PORT");
}
@Override
public void throwException() {
throw new ArithmeticException();
}
@Override
public boolean dotNetMethod() {
return true;