Fix blocking calls for gRPC even though using Reactor (#435)

* Fix blocking calls for gRPC even though using Reactor

* delete unused wrap method

* add: throw exception if fatal

* Add "NoHotMono" unit tests for DaprClientGrpc

Co-authored-by: 刘禅 <zuojie@alibaba-inc.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
xiazuojie 2021-01-12 11:40:15 +08:00 committed by GitHub
parent 9e13586ecc
commit 76d877c9c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 781 additions and 689 deletions

View File

@ -122,8 +122,8 @@ public class DaprClientBuilder {
channel.shutdown(); channel.shutdown();
} }
}; };
DaprGrpc.DaprFutureStub stub = DaprGrpc.newFutureStub(channel); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientGrpc(closeableChannel, stub, this.objectSerializer, this.stateSerializer); return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer);
} }
/** /**

View File

@ -6,9 +6,9 @@
package io.dapr.client; package io.dapr.client;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.client.domain.DeleteStateRequest; import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest; import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.GetBulkStateRequest; import io.dapr.client.domain.GetBulkStateRequest;
@ -39,6 +39,7 @@ import io.grpc.ForwardingClientCall;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Metadata.Key; import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
import io.opencensus.implcore.trace.propagation.PropagationComponentImpl; import io.opencensus.implcore.trace.propagation.PropagationComponentImpl;
import io.opencensus.implcore.trace.propagation.TraceContextFormat; import io.opencensus.implcore.trace.propagation.TraceContextFormat;
import io.opencensus.trace.SpanContext; import io.opencensus.trace.SpanContext;
@ -50,6 +51,7 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapPropagator;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -57,6 +59,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -85,29 +89,28 @@ public class DaprClientGrpc extends AbstractDaprClient {
private Closeable channel; private Closeable channel;
/** /**
* The GRPC client to be used. * The async gRPC stub.
*
* @see io.dapr.v1.DaprGrpc.DaprFutureStub
*/ */
private DaprGrpc.DaprFutureStub client; private DaprGrpc.DaprStub asyncStub;
/** /**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
* *
* @param closeableChannel A closeable for a Managed GRPC channel * @param closeableChannel A closeable for a Managed GRPC channel
* @param futureClient GRPC client * @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects. * @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects. * @param stateSerializer Serializer for state objects.
* @see DaprClientBuilder * @see DaprClientBuilder
*/ */
DaprClientGrpc( DaprClientGrpc(
Closeable closeableChannel, Closeable closeableChannel,
DaprGrpc.DaprFutureStub futureClient, DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer, DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) { DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer); super(objectSerializer, stateSerializer);
this.channel = closeableChannel; this.channel = closeableChannel;
this.client = populateWithInterceptors(futureClient); this.asyncStub = populateWithInterceptors(asyncStub);
} }
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
@ -169,10 +172,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
} }
return Mono.fromCallable(wrap(context, () -> { return this.<Empty>createMono(
get(client.publishEvent(envelopeBuilder.build())); context,
return null; it -> asyncStub.publishEvent(envelopeBuilder.build(), it)
})); ).thenReturn(new Response<>(context, null));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -194,12 +197,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
appId, appId,
method, method,
request); request);
return Mono.fromCallable(wrap(context, () -> {
ListenableFuture<CommonProtos.InvokeResponse> futureResponse =
client.invokeService(envelope);
return objectSerializer.deserialize(get(futureResponse).getData().getValue().toByteArray(), type); return this.<CommonProtos.InvokeResponse>createMono(
})).map(r -> new Response<>(context, r)); context,
it -> asyncStub.invokeService(envelope, it)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
).map(r -> new Response<>(context, r));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -234,10 +244,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
builder.putAllMetadata(metadata); builder.putAllMetadata(metadata);
} }
DaprProtos.InvokeBindingRequest envelope = builder.build(); DaprProtos.InvokeBindingRequest envelope = builder.build();
return Mono.fromCallable(wrap(context, () -> {
ListenableFuture<DaprProtos.InvokeBindingResponse> futureResponse = client.invokeBinding(envelope); return this.<DaprProtos.InvokeBindingResponse>createMono(
return objectSerializer.deserialize(get(futureResponse).getData().toByteArray(), type); context,
})).map(r -> new Response<>(context, r)); it -> asyncStub.invokeBinding(envelope, it)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
).map(r -> new Response<>(context, r));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -271,10 +290,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
DaprProtos.GetStateRequest envelope = builder.build(); DaprProtos.GetStateRequest envelope = builder.build();
return Mono.fromCallable(wrap(context, () -> {
ListenableFuture<DaprProtos.GetStateResponse> futureResponse = client.getState(envelope); return this.<DaprProtos.GetStateResponse>createMono(
return buildStateKeyValue(get(futureResponse), key, options, type); context,
})).map(s -> new Response<>(context, s)); it -> asyncStub.getState(envelope, it)
).map(
it -> {
try {
return buildStateKeyValue(it, key, options, type);
} catch (IOException ex) {
throw DaprException.propagate(ex);
}
}
).map(s -> new Response<>(context, s));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -309,23 +337,24 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
DaprProtos.GetBulkStateRequest envelope = builder.build(); DaprProtos.GetBulkStateRequest envelope = builder.build();
return Mono.fromCallable(wrap(context, () -> {
ListenableFuture<DaprProtos.GetBulkStateResponse> futureResponse = client.getBulkState(envelope);
DaprProtos.GetBulkStateResponse response = get(futureResponse);
return response return this.<DaprProtos.GetBulkStateResponse>createMono(
.getItemsList() context,
.stream() it -> asyncStub.getBulkState(envelope, it)
.map(b -> { ).map(
try { it ->
return buildStateKeyValue(b, type); it
} catch (Exception e) { .getItemsList()
DaprException.wrap(e); .stream()
return null; .map(b -> {
} try {
}) return buildStateKeyValue(b, type);
.collect(Collectors.toList()); } catch (Exception e) {
})).map(s -> new Response<>(context, s)); throw DaprException.propagate(e);
}
})
.collect(Collectors.toList())
).map(s -> new Response<>(context, s));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -393,8 +422,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
DaprProtos.ExecuteStateTransactionRequest req = builder.build(); DaprProtos.ExecuteStateTransactionRequest req = builder.build();
return Mono.fromCallable(wrap(context, () -> client.executeStateTransaction(req))).map(f -> get(f)) return this.<Empty>createMono(
.thenReturn(new Response<>(context, null)); context,
it -> asyncStub.executeStateTransaction(req, it)
).thenReturn(new Response<>(context, null));
} catch (Exception e) { } catch (Exception e) {
return DaprException.wrapMono(e); return DaprException.wrapMono(e);
} }
@ -419,8 +450,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
DaprProtos.SaveStateRequest req = builder.build(); DaprProtos.SaveStateRequest req = builder.build();
return Mono.fromCallable(wrap(context, () -> client.saveState(req))).map(f -> get(f)) return this.<Empty>createMono(
.thenReturn(new Response<>(context, null)); context,
it -> asyncStub.saveState(req, it)
).thenReturn(new Response<>(context, null));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -499,8 +532,11 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
DaprProtos.DeleteStateRequest req = builder.build(); DaprProtos.DeleteStateRequest req = builder.build();
return Mono.fromCallable(wrap(context, () -> client.deleteState(req))).map(f -> get(f))
.thenReturn(new Response<>(context, null)); return this.<Empty>createMono(
context,
it -> asyncStub.deleteState(req, it)
).thenReturn(new Response<>(context, null));
} catch (Exception ex) { } catch (Exception ex) {
return DaprException.wrapMono(ex); return DaprException.wrapMono(ex);
} }
@ -574,11 +610,12 @@ public class DaprClientGrpc extends AbstractDaprClient {
if (metadata != null) { if (metadata != null) {
requestBuilder.putAllMetadata(metadata); requestBuilder.putAllMetadata(metadata);
} }
return Mono.fromCallable(wrap(context, () -> { DaprProtos.GetSecretRequest req = requestBuilder.build();
DaprProtos.GetSecretRequest req = requestBuilder.build();
ListenableFuture<DaprProtos.GetSecretResponse> future = client.getSecret(req); return this.<DaprProtos.GetSecretResponse>createMono(
return get(future); context,
})).map(future -> new Response<>(context, future.getDataMap())); it -> asyncStub.getSecret(req, it)
).map(it -> new Response<>(context, it.getDataMap()));
} }
/** /**
@ -602,13 +639,13 @@ public class DaprClientGrpc extends AbstractDaprClient {
* @param client GRPC client for Dapr. * @param client GRPC client for Dapr.
* @return Client after adding interceptors. * @return Client after adding interceptors.
*/ */
private static DaprGrpc.DaprFutureStub populateWithInterceptors(DaprGrpc.DaprFutureStub client) { private static DaprGrpc.DaprStub populateWithInterceptors(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() { ClientInterceptor interceptor = new ClientInterceptor() {
@Override @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions, CallOptions callOptions,
Channel channel) { Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions); ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) { return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override @Override
@ -666,21 +703,36 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
} }
private static <V> Callable<V> wrap(Context context, Callable<V> callable) { private static Runnable wrap(Context context, Runnable runnable) {
if (context == null) { if (context == null) {
return DaprException.wrap(callable); return DaprException.wrap(runnable);
} }
return DaprException.wrap(context.wrap(callable)); return DaprException.wrap(context.wrap(runnable));
} }
private static <V> V get(ListenableFuture<V> future) { private <T> Mono<T> createMono(Context context, Consumer<StreamObserver<T>> consumer) {
try { return Mono.create(
return future.get(); sink -> wrap(context, () -> consumer.accept(createStreamObserver(sink))).run()
} catch (Exception e) { );
DaprException.wrap(e); }
}
return null; private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
sink.success(value);
}
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
}
@Override
public void onCompleted() {
sink.success();
}
};
} }
} }

View File

@ -6,6 +6,7 @@
package io.dapr.exceptions; package io.dapr.exceptions;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -37,7 +38,7 @@ public class DaprException extends RuntimeException {
* permitted, and indicates that the cause is nonexistent or * permitted, and indicates that the cause is nonexistent or
* unknown.) * unknown.)
*/ */
public DaprException(DaprError daprError, Exception cause) { public DaprException(DaprError daprError, Throwable cause) {
this(daprError.getErrorCode(), daprError.getMessage(), cause); this(daprError.getErrorCode(), daprError.getMessage(), cause);
} }
@ -45,7 +46,7 @@ public class DaprException extends RuntimeException {
* Wraps an exception into a DaprException. * Wraps an exception into a DaprException.
* @param exception the exception to be wrapped. * @param exception the exception to be wrapped.
*/ */
public DaprException(Exception exception) { public DaprException(Throwable exception) {
this("UNKNOWN", exception.getMessage(), exception); this("UNKNOWN", exception.getMessage(), exception);
} }
@ -69,7 +70,7 @@ public class DaprException extends RuntimeException {
* permitted, and indicates that the cause is nonexistent or * permitted, and indicates that the cause is nonexistent or
* unknown.) * unknown.)
*/ */
public DaprException(String errorCode, String message, Exception cause) { public DaprException(String errorCode, String message, Throwable cause) {
super(String.format("%s: %s", errorCode, emptyIfNull(message)), cause); super(String.format("%s: %s", errorCode, emptyIfNull(message)), cause);
this.errorCode = errorCode; this.errorCode = errorCode;
} }
@ -88,33 +89,12 @@ public class DaprException extends RuntimeException {
* *
* @param exception Exception to be wrapped. * @param exception Exception to be wrapped.
*/ */
public static void wrap(Exception exception) { public static void wrap(Throwable exception) {
if (exception == null) { if (exception == null) {
return; return;
} }
if (exception instanceof DaprException) { throw propagate(exception);
throw (DaprException) exception;
}
Throwable e = exception;
while (e != null) {
if (e instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
throw new DaprException(
statusRuntimeException.getStatus().getCode().toString(),
statusRuntimeException.getStatus().getDescription(),
exception);
}
e = e.getCause();
}
if (exception instanceof IllegalArgumentException) {
throw (IllegalArgumentException) exception;
}
throw new DaprException(exception);
} }
/** /**
@ -134,6 +114,21 @@ public class DaprException extends RuntimeException {
}; };
} }
/**
* Wraps a runnable with a try-catch to throw DaprException.
* @param runnable runnable to be invoked.
* @return object of type T.
*/
public static Runnable wrap(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Exception e) {
wrap(e);
}
};
}
/** /**
* Wraps an exception into DaprException (if not already DaprException). * Wraps an exception into DaprException (if not already DaprException).
* *
@ -151,6 +146,39 @@ public class DaprException extends RuntimeException {
return Mono.empty(); return Mono.empty();
} }
/**
* Wraps an exception into DaprException (if not already DaprException).
*
* @param exception Exception to be wrapped.
* @return wrapped RuntimeException
*/
public static RuntimeException propagate(Throwable exception) {
Exceptions.throwIfFatal(exception);
if (exception instanceof DaprException) {
return (DaprException) exception;
}
Throwable e = exception;
while (e != null) {
if (e instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
return new DaprException(
statusRuntimeException.getStatus().getCode().toString(),
statusRuntimeException.getStatus().getDescription(),
exception);
}
e = e.getCause();
}
if (exception instanceof IllegalArgumentException) {
return (IllegalArgumentException) exception;
}
return new DaprException(exception);
}
private static String emptyIfNull(String str) { private static String emptyIfNull(String str) {
if (str == null) { if (str == null) {
return ""; return "";

File diff suppressed because it is too large Load Diff