diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index a76b2cffd..1ff59ab4f 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -122,8 +122,8 @@ public class DaprClientBuilder { channel.shutdown(); } }; - DaprGrpc.DaprFutureStub stub = DaprGrpc.newFutureStub(channel); - return new DaprClientGrpc(closeableChannel, stub, this.objectSerializer, this.stateSerializer); + DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); + return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer); } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 408d587e7..8c9cb5b4a 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -6,9 +6,9 @@ package io.dapr.client; import com.google.common.base.Strings; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import io.dapr.client.domain.DeleteStateRequest; import io.dapr.client.domain.ExecuteStateTransactionRequest; import io.dapr.client.domain.GetBulkStateRequest; @@ -39,6 +39,7 @@ import io.grpc.ForwardingClientCall; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.MethodDescriptor; +import io.grpc.stub.StreamObserver; import io.opencensus.implcore.trace.propagation.PropagationComponentImpl; import io.opencensus.implcore.trace.propagation.TraceContextFormat; import io.opencensus.trace.SpanContext; @@ -50,6 +51,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapPropagator; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import java.io.Closeable; import java.io.IOException; @@ -57,6 +59,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -85,29 +89,28 @@ public class DaprClientGrpc extends AbstractDaprClient { private Closeable channel; /** - * The GRPC client to be used. - * - * @see io.dapr.v1.DaprGrpc.DaprFutureStub + * The async gRPC stub. */ - private DaprGrpc.DaprFutureStub client; + private DaprGrpc.DaprStub asyncStub; + /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * * @param closeableChannel A closeable for a Managed GRPC channel - * @param futureClient GRPC client + * @param asyncStub async gRPC stub * @param objectSerializer Serializer for transient request/response objects. * @param stateSerializer Serializer for state objects. * @see DaprClientBuilder */ DaprClientGrpc( Closeable closeableChannel, - DaprGrpc.DaprFutureStub futureClient, + DaprGrpc.DaprStub asyncStub, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { super(objectSerializer, stateSerializer); this.channel = closeableChannel; - this.client = populateWithInterceptors(futureClient); + this.asyncStub = populateWithInterceptors(asyncStub); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -169,10 +172,10 @@ public class DaprClientGrpc extends AbstractDaprClient { } } - return Mono.fromCallable(wrap(context, () -> { - get(client.publishEvent(envelopeBuilder.build())); - return null; - })); + return this.createMono( + context, + it -> asyncStub.publishEvent(envelopeBuilder.build(), it) + ).thenReturn(new Response<>(context, null)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -194,12 +197,19 @@ public class DaprClientGrpc extends AbstractDaprClient { appId, method, request); - return Mono.fromCallable(wrap(context, () -> { - ListenableFuture futureResponse = - client.invokeService(envelope); - return objectSerializer.deserialize(get(futureResponse).getData().getValue().toByteArray(), type); - })).map(r -> new Response<>(context, r)); + return this.createMono( + context, + it -> asyncStub.invokeService(envelope, it) + ).flatMap( + it -> { + try { + return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type)); + } catch (IOException e) { + throw DaprException.propagate(e); + } + } + ).map(r -> new Response<>(context, r)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -234,10 +244,19 @@ public class DaprClientGrpc extends AbstractDaprClient { builder.putAllMetadata(metadata); } DaprProtos.InvokeBindingRequest envelope = builder.build(); - return Mono.fromCallable(wrap(context, () -> { - ListenableFuture futureResponse = client.invokeBinding(envelope); - return objectSerializer.deserialize(get(futureResponse).getData().toByteArray(), type); - })).map(r -> new Response<>(context, r)); + + return this.createMono( + context, + it -> asyncStub.invokeBinding(envelope, it) + ).flatMap( + it -> { + try { + return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); + } catch (IOException e) { + throw DaprException.propagate(e); + } + } + ).map(r -> new Response<>(context, r)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -271,10 +290,19 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.GetStateRequest envelope = builder.build(); - return Mono.fromCallable(wrap(context, () -> { - ListenableFuture futureResponse = client.getState(envelope); - return buildStateKeyValue(get(futureResponse), key, options, type); - })).map(s -> new Response<>(context, s)); + + return this.createMono( + context, + it -> asyncStub.getState(envelope, it) + ).map( + it -> { + try { + return buildStateKeyValue(it, key, options, type); + } catch (IOException ex) { + throw DaprException.propagate(ex); + } + } + ).map(s -> new Response<>(context, s)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -309,23 +337,24 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.GetBulkStateRequest envelope = builder.build(); - return Mono.fromCallable(wrap(context, () -> { - ListenableFuture futureResponse = client.getBulkState(envelope); - DaprProtos.GetBulkStateResponse response = get(futureResponse); - return response - .getItemsList() - .stream() - .map(b -> { - try { - return buildStateKeyValue(b, type); - } catch (Exception e) { - DaprException.wrap(e); - return null; - } - }) - .collect(Collectors.toList()); - })).map(s -> new Response<>(context, s)); + return this.createMono( + context, + it -> asyncStub.getBulkState(envelope, it) + ).map( + it -> + it + .getItemsList() + .stream() + .map(b -> { + try { + return buildStateKeyValue(b, type); + } catch (Exception e) { + throw DaprException.propagate(e); + } + }) + .collect(Collectors.toList()) + ).map(s -> new Response<>(context, s)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -393,8 +422,10 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.ExecuteStateTransactionRequest req = builder.build(); - return Mono.fromCallable(wrap(context, () -> client.executeStateTransaction(req))).map(f -> get(f)) - .thenReturn(new Response<>(context, null)); + return this.createMono( + context, + it -> asyncStub.executeStateTransaction(req, it) + ).thenReturn(new Response<>(context, null)); } catch (Exception e) { return DaprException.wrapMono(e); } @@ -419,8 +450,10 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.SaveStateRequest req = builder.build(); - return Mono.fromCallable(wrap(context, () -> client.saveState(req))).map(f -> get(f)) - .thenReturn(new Response<>(context, null)); + return this.createMono( + context, + it -> asyncStub.saveState(req, it) + ).thenReturn(new Response<>(context, null)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -499,8 +532,11 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.DeleteStateRequest req = builder.build(); - return Mono.fromCallable(wrap(context, () -> client.deleteState(req))).map(f -> get(f)) - .thenReturn(new Response<>(context, null)); + + return this.createMono( + context, + it -> asyncStub.deleteState(req, it) + ).thenReturn(new Response<>(context, null)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -574,11 +610,12 @@ public class DaprClientGrpc extends AbstractDaprClient { if (metadata != null) { requestBuilder.putAllMetadata(metadata); } - return Mono.fromCallable(wrap(context, () -> { - DaprProtos.GetSecretRequest req = requestBuilder.build(); - ListenableFuture future = client.getSecret(req); - return get(future); - })).map(future -> new Response<>(context, future.getDataMap())); + DaprProtos.GetSecretRequest req = requestBuilder.build(); + + return this.createMono( + context, + it -> asyncStub.getSecret(req, it) + ).map(it -> new Response<>(context, it.getDataMap())); } /** @@ -602,13 +639,13 @@ public class DaprClientGrpc extends AbstractDaprClient { * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprFutureStub populateWithInterceptors(DaprGrpc.DaprFutureStub client) { + private static DaprGrpc.DaprStub populateWithInterceptors(DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions, - Channel channel) { + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { @Override @@ -666,21 +703,36 @@ public class DaprClientGrpc extends AbstractDaprClient { } } - private static Callable wrap(Context context, Callable callable) { + private static Runnable wrap(Context context, Runnable runnable) { if (context == null) { - return DaprException.wrap(callable); + return DaprException.wrap(runnable); } - return DaprException.wrap(context.wrap(callable)); + return DaprException.wrap(context.wrap(runnable)); } - private static V get(ListenableFuture future) { - try { - return future.get(); - } catch (Exception e) { - DaprException.wrap(e); - } + private Mono createMono(Context context, Consumer> consumer) { + return Mono.create( + sink -> wrap(context, () -> consumer.accept(createStreamObserver(sink))).run() + ); + } - return null; + private StreamObserver createStreamObserver(MonoSink sink) { + return new StreamObserver() { + @Override + public void onNext(T value) { + sink.success(value); + } + + @Override + public void onError(Throwable t) { + sink.error(DaprException.propagate(new ExecutionException(t))); + } + + @Override + public void onCompleted() { + sink.success(); + } + }; } } diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprException.java b/sdk/src/main/java/io/dapr/exceptions/DaprException.java index f9aae559d..0996c75b5 100644 --- a/sdk/src/main/java/io/dapr/exceptions/DaprException.java +++ b/sdk/src/main/java/io/dapr/exceptions/DaprException.java @@ -6,6 +6,7 @@ package io.dapr.exceptions; import io.grpc.StatusRuntimeException; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.util.concurrent.Callable; @@ -37,7 +38,7 @@ public class DaprException extends RuntimeException { * permitted, and indicates that the cause is nonexistent or * unknown.) */ - public DaprException(DaprError daprError, Exception cause) { + public DaprException(DaprError daprError, Throwable cause) { this(daprError.getErrorCode(), daprError.getMessage(), cause); } @@ -45,7 +46,7 @@ public class DaprException extends RuntimeException { * Wraps an exception into a DaprException. * @param exception the exception to be wrapped. */ - public DaprException(Exception exception) { + public DaprException(Throwable exception) { this("UNKNOWN", exception.getMessage(), exception); } @@ -69,7 +70,7 @@ public class DaprException extends RuntimeException { * permitted, and indicates that the cause is nonexistent or * unknown.) */ - public DaprException(String errorCode, String message, Exception cause) { + public DaprException(String errorCode, String message, Throwable cause) { super(String.format("%s: %s", errorCode, emptyIfNull(message)), cause); this.errorCode = errorCode; } @@ -88,33 +89,12 @@ public class DaprException extends RuntimeException { * * @param exception Exception to be wrapped. */ - public static void wrap(Exception exception) { + public static void wrap(Throwable exception) { if (exception == null) { return; } - if (exception instanceof DaprException) { - throw (DaprException) exception; - } - - Throwable e = exception; - while (e != null) { - if (e instanceof StatusRuntimeException) { - StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e; - throw new DaprException( - statusRuntimeException.getStatus().getCode().toString(), - statusRuntimeException.getStatus().getDescription(), - exception); - } - - e = e.getCause(); - } - - if (exception instanceof IllegalArgumentException) { - throw (IllegalArgumentException) exception; - } - - throw new DaprException(exception); + throw propagate(exception); } /** @@ -134,6 +114,21 @@ public class DaprException extends RuntimeException { }; } + /** + * Wraps a runnable with a try-catch to throw DaprException. + * @param runnable runnable to be invoked. + * @return object of type T. + */ + public static Runnable wrap(Runnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Exception e) { + wrap(e); + } + }; + } + /** * Wraps an exception into DaprException (if not already DaprException). * @@ -151,6 +146,39 @@ public class DaprException extends RuntimeException { return Mono.empty(); } + /** + * Wraps an exception into DaprException (if not already DaprException). + * + * @param exception Exception to be wrapped. + * @return wrapped RuntimeException + */ + public static RuntimeException propagate(Throwable exception) { + Exceptions.throwIfFatal(exception); + + if (exception instanceof DaprException) { + return (DaprException) exception; + } + + Throwable e = exception; + while (e != null) { + if (e instanceof StatusRuntimeException) { + StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e; + return new DaprException( + statusRuntimeException.getStatus().getCode().toString(), + statusRuntimeException.getStatus().getDescription(), + exception); + } + + e = e.getCause(); + } + + if (exception instanceof IllegalArgumentException) { + return (IllegalArgumentException) exception; + } + + return new DaprException(exception); + } + private static String emptyIfNull(String str) { if (str == null) { return ""; diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 770f5f5d2..185e3b141 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -5,8 +5,6 @@ package io.dapr.client; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; @@ -32,11 +30,12 @@ import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import org.checkerframework.checker.nullness.compatqual.NullableDecl; +import io.grpc.stub.StreamObserver; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.stubbing.Answer; import reactor.core.publisher.Mono; import java.io.Closeable; @@ -50,21 +49,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; -import static com.google.common.util.concurrent.Futures.addCallback; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.dapr.utils.TestUtils.assertThrowsDaprException; import static io.dapr.utils.TestUtils.findFreePort; import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class DaprClientGrpcTest { @@ -73,14 +65,14 @@ public class DaprClientGrpcTest { private static final String SECRET_STORE_NAME = "MySecretStore"; private Closeable closeable; - private DaprGrpc.DaprFutureStub client; + private DaprGrpc.DaprStub client; private DaprClientGrpc adapter; private ObjectSerializer serializer; @Before public void setup() throws IOException { closeable = mock(Closeable.class); - client = mock(DaprGrpc.DaprFutureStub.class); + client = mock(DaprGrpc.DaprStub.class); when(client.withInterceptors(any())).thenReturn(client); adapter = new DaprClientGrpc(closeable, client, new DefaultObjectSerializer(), new DefaultObjectSerializer()); serializer = new ObjectSerializer(); @@ -120,26 +112,26 @@ public class DaprClientGrpcTest { @Test public void publishEventExceptionThrownTest() { - when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) - .thenThrow(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument")); + doAnswer((Answer) invocation -> { + throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"); + }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); assertThrowsDaprException( - StatusRuntimeException.class, - "INVALID_ARGUMENT", - "INVALID_ARGUMENT: bad bad argument", - () -> adapter.publishEvent("pubsubname","topic", "object").block()); + StatusRuntimeException.class, + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: bad bad argument", + () -> adapter.publishEvent("pubsubname","topic", "object").block()); } @Test public void publishEventCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); - RuntimeException ex = newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument")); + return null; + }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + Mono result = adapter.publishEvent("pubsubname","topic", "object"); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -152,9 +144,13 @@ public class DaprClientGrpcTest { public void publishEventSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); adapter = new DaprClientGrpc(closeable, client, mockSerializer, new DefaultObjectSerializer()); - SettableFuture settableFuture = SettableFuture.create(); - when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + when(mockSerializer.serialize(any())).thenThrow(IOException.class); Mono result = adapter.publishEvent("pubsubname","topic", "{invalid-json"); @@ -167,44 +163,44 @@ public class DaprClientGrpcTest { @Test public void publishEventTest() { - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + Mono result = adapter.publishEvent("pubsubname","topic", "object"); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test public void publishEventNoHotMono() { - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) - .thenAnswer(c -> { - settableFuture.set(Empty.newBuilder().build()); - return settableFuture; - }); + AtomicBoolean called = new AtomicBoolean(false); + doAnswer((Answer) invocation -> { + called.set(true); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); adapter.publishEvent("pubsubname", "topic", "object"); // Do not call block() on the mono above, so nothing should happen. - assertFalse(callback.wasCalled); + assertFalse(called.get()); } @Test public void publishEventObjectTest() { - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + MyObject event = new MyObject(1, "Event"); Mono result = adapter.publishEvent("pubsubname", "topic", event); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -231,9 +227,13 @@ public class DaprClientGrpcTest { public void invokeBindingSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); adapter = new DaprClientGrpc(closeable, client, mockSerializer, new DefaultObjectSerializer()); - SettableFuture settableFuture = SettableFuture.create(); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + when(mockSerializer.serialize(any())).thenThrow(IOException.class); Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); @@ -246,8 +246,10 @@ public class DaprClientGrpcTest { @Test public void invokeBindingExceptionThrownTest() { - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request"); assertThrowsDaprException( @@ -259,14 +261,13 @@ public class DaprClientGrpcTest { @Test public void invokeBindingCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = - new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.setException(ex); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request"); assertThrowsDaprException( @@ -278,102 +279,110 @@ public class DaprClientGrpcTest { @Test public void invokeBindingTest() throws IOException { - SettableFuture settableFuture = SettableFuture.create(); DaprProtos.InvokeBindingResponse.Builder responseBuilder = DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize("OK")); - MockCallback callback = new MockCallback<>(responseBuilder.build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request"); - settableFuture.set(responseBuilder.build()); result.block(); - assertTrue(callback.wasCalled); } @Test - public void invokeBindingByteArrayTest() throws IOException { - SettableFuture settableFuture = SettableFuture.create(); + public void invokeBindingByteArrayTest() { DaprProtos.InvokeBindingResponse.Builder responseBuilder = DaprProtos.InvokeBindingResponse.newBuilder().setData(ByteString.copyFrom("OK".getBytes())); - MockCallback callback = new MockCallback<>(responseBuilder.build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + Mono result = adapter.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); - settableFuture.set(responseBuilder.build()); + assertEquals("OK", new String(result.block(), StandardCharsets.UTF_8)); - assertTrue(callback.wasCalled); } @Test public void invokeBindingObjectTest() throws IOException { - SettableFuture settableFuture = SettableFuture.create(); DaprProtos.InvokeBindingResponse.Builder responseBuilder = DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize("OK")); - MockCallback callback = new MockCallback<>(responseBuilder.build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + MyObject event = new MyObject(1, "Event"); Mono result = adapter.invokeBinding("BindingName", "MyOperation", event); - settableFuture.set(responseBuilder.build()); + result.block(); - assertTrue(callback.wasCalled); } @Test public void invokeBindingResponseObjectTest() throws IOException { - SettableFuture settableFuture = SettableFuture.create(); DaprProtos.InvokeBindingResponse.Builder responseBuilder = DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize("OK")); - MockCallback callback = new MockCallback<>(responseBuilder.build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + MyObject event = new MyObject(1, "Event"); Mono result = adapter.invokeBinding("BindingName", "MyOperation", event, String.class); - settableFuture.set(responseBuilder.build()); + assertEquals("OK", result.block()); - assertTrue(callback.wasCalled); } @Test public void invokeBindingResponseObjectTypeRefTest() throws IOException { - SettableFuture settableFuture = SettableFuture.create(); DaprProtos.InvokeBindingResponse.Builder responseBuilder = - DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize("OK")); - MockCallback callback = new MockCallback<>(responseBuilder.build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenReturn(settableFuture); + DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize("OK")); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + MyObject event = new MyObject(1, "Event"); Mono result = adapter.invokeBinding("BindingName", "MyOperation", event, TypeRef.get(String.class)); - settableFuture.set(responseBuilder.build()); + assertEquals("OK", result.block()); - assertTrue(callback.wasCalled); } @Test - public void invokeBindingObjectNoHotMono() { - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeBinding(any(DaprProtos.InvokeBindingRequest.class))) - .thenAnswer(c -> { - settableFuture.set(Empty.newBuilder().build()); - return settableFuture; - }); + public void invokeBindingObjectNoHotMono() throws IOException { + AtomicBoolean called = new AtomicBoolean(false); + DaprProtos.InvokeBindingResponse.Builder responseBuilder = + DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize("OK")); + doAnswer((Answer) invocation -> { + called.set(true); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(client).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); MyObject event = new MyObject(1, "Event"); adapter.invokeBinding("BindingName", "MyOperation", event); // Do not call block() on mono above, so nothing should happen. - assertFalse(callback.wasCalled); + assertFalse(called.get()); } @Test public void invokeServiceVoidExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE); assertThrowsDaprException( @@ -385,9 +394,13 @@ public class DaprClientGrpcTest { @Test public void invokeServiceIllegalArgumentExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + // HttpExtension cannot be null Mono result = adapter.invokeMethod("appId", "method", "request", null); @@ -396,8 +409,10 @@ public class DaprClientGrpcTest { @Test public void invokeServiceEmptyRequestVoidExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null); assertThrowsDaprException( @@ -409,13 +424,13 @@ public class DaprClientGrpcTest { @Test public void invokeServiceVoidCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.setException(ex); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE); assertThrowsDaprException( @@ -426,40 +441,38 @@ public class DaprClientGrpcTest { } @Test - public void invokeServiceVoidTest() throws Exception { - SettableFuture settableFuture = SettableFuture.create(); + public void invokeServiceVoidTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny("Value")).build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); result.block(); - assertTrue(callback.wasCalled); } @Test - public void invokeServiceVoidObjectTest() throws Exception { - SettableFuture settableFuture = SettableFuture.create(); + public void invokeServiceVoidObjectTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny("Value")).build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); MyObject request = new MyObject(1, "Event"); Mono result = adapter.invokeMethod("appId", "method", request, HttpExtension.NONE); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny("Value")).build()); result.block(); - assertTrue(callback.wasCalled); } @Test public void invokeServiceExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); assertThrowsDaprException( @@ -471,8 +484,10 @@ public class DaprClientGrpcTest { @Test public void invokeServiceNoRequestClassExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null, String.class); assertThrowsDaprException( @@ -484,8 +499,10 @@ public class DaprClientGrpcTest { @Test public void invokeServiceNoRequestTypeRefExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", HttpExtension.NONE, (Map)null, TypeRef.STRING); assertThrowsDaprException( @@ -497,14 +514,14 @@ public class DaprClientGrpcTest { @Test public void invokeServiceCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -531,53 +548,58 @@ public class DaprClientGrpcTest { .setMessage(message) .build(); String expected = "Value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(expected)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); - when(client.invokeService(eq(request))) - .thenReturn(settableFuture); + + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(eq(request), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", httpExtension, null, String.class); String strOutput = result.block(); assertEquals(expected, strOutput); } @Test - public void invokeServiceTest() throws Exception { + public void invokeServiceTest() { String expected = "Value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(expected)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, String.class); String strOutput = result.block(); + assertEquals(expected, strOutput); } @Test public void invokeServiceObjectTest() throws Exception { MyObject object = new MyObject(1, "Value"); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(object)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(object)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(object)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, MyObject.class); MyObject resultObject = result.block(); + assertEquals(object.id, resultObject.id); assertEquals(object.value, resultObject.value); } @Test public void invokeServiceNoRequestBodyExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); assertThrowsDaprException( @@ -589,14 +611,14 @@ public class DaprClientGrpcTest { @Test public void invokeServiceNoRequestCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -608,42 +630,44 @@ public class DaprClientGrpcTest { @Test public void invokeServiceNoRequestBodyTest() throws Exception { String expected = "Value"; - SettableFuture settableFuture = SettableFuture.create(); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(expected)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, String.class); String strOutput = result.block(); + assertEquals(expected, strOutput); } @Test public void invokeServiceNoRequestBodyObjectTest() throws Exception { MyObject object = new MyObject(1, "Value"); - SettableFuture settableFuture = SettableFuture.create(); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(object)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(object)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(object)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE, MyObject.class); MyObject resultObject = result.block(); + assertEquals(object.id, resultObject.id); assertEquals(object.value, resultObject.value); } @Test public void invokeServiceByteRequestExceptionThrownTest() throws IOException { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); + Mono result = adapter.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE, byte[].class); assertThrowsDaprException( @@ -655,17 +679,17 @@ public class DaprClientGrpcTest { @Test public void invokeServiceByteRequestCallbackExceptionThrownTest() throws IOException { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); + Mono result = adapter.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE,(HashMap) null); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -677,18 +701,19 @@ public class DaprClientGrpcTest { @Test public void invokeByteRequestServiceTest() throws Exception { String expected = "Value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(expected)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); String request = "Request"; byte[] byteRequest = serializer.serialize(request); + Mono result = adapter.invokeMethod( "appId", "method", byteRequest, HttpExtension.NONE, (HashMap) null); byte[] byteOutput = result.block(); + String strOutput = serializer.deserialize(byteOutput, String.class); assertEquals(expected, strOutput); } @@ -696,24 +721,26 @@ public class DaprClientGrpcTest { @Test public void invokeServiceByteRequestObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(resultObj)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(resultObj)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(resultObj)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + String request = "Request"; byte[] byteRequest = serializer.serialize(request); Mono result = adapter.invokeMethod("appId", "method", byteRequest, HttpExtension.NONE, byte[].class); byte[] byteOutput = result.block(); + assertEquals(resultObj, serializer.deserialize(byteOutput, MyObject.class)); } @Test public void invokeServiceNoRequestNoClassBodyExceptionThrownTest() { - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); assertThrowsDaprException( @@ -725,14 +752,14 @@ public class DaprClientGrpcTest { @Test public void invokeServiceNoRequestNoClassCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -742,51 +769,47 @@ public class DaprClientGrpcTest { } @Test - public void invokeServiceNoRequestNoClassBodyTest() throws Exception { + public void invokeServiceNoRequestNoClassBodyTest() { String expected = "Value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(expected)).build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); result.block(); - assertTrue(callback.wasCalled); } @Test - public void invokeServiceNoRequestNoHotMono() throws Exception { + public void invokeServiceNoRequestNoHotMono() { + AtomicBoolean called = new AtomicBoolean(false); String expected = "Value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(expected)).build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenAnswer(c -> { - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + called.set(true); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); // Do not call block() on mono above, so nothing should happen. - assertFalse(callback.wasCalled); + assertFalse(called.get()); } @Test public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); - SettableFuture settableFuture = SettableFuture.create(); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(resultObj)).build()); + observer.onCompleted(); + return null; + }).when(client).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); - MockCallback callback = new MockCallback<>(CommonProtos.InvokeResponse.newBuilder() - .setData(getAny(resultObj)).build()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(CommonProtos.InvokeResponse.newBuilder().setData(getAny(resultObj)).build()); - when(client.invokeService(any(DaprProtos.InvokeServiceRequest.class))) - .thenReturn(settableFuture); Mono result = adapter.invokeMethod("appId", "method", (Object)null, HttpExtension.NONE); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -812,7 +835,10 @@ public class DaprClientGrpcTest { @Test public void getStateExceptionThrownTest() { - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))).thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + State key = buildStateKey(null, "Key1", "ETag1", null); Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); @@ -825,16 +851,15 @@ public class DaprClientGrpcTest { @Test public void getStateCallbackExceptionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = - new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + State key = buildStateKey(null, "Key1", "ETag1", null); Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -850,38 +875,40 @@ public class DaprClientGrpcTest { String expectedValue = "Expected state"; State expectedState = buildStateKey(expectedValue, key, etag, new HashMap<>(), null); DaprProtos.GetStateResponse responseEnvelope = buildGetStateResponse(expectedValue, etag); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + State keyRequest = buildStateKey(null, key, etag, null); Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class); - settableFuture.set(responseEnvelope); State res = result.block(); + assertNotNull(res); assertEquals(expectedState, res); } @Test public void getStateStringValueNoHotMono() throws IOException { + AtomicBoolean called = new AtomicBoolean(false); String etag = "ETag1"; String key = "key1"; String expectedValue = "Expected state"; - State expectedState = buildStateKey(expectedValue, key, etag, null); DaprProtos.GetStateResponse responseEnvelope = buildGetStateResponse(expectedValue, etag); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + called.set(true); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + State keyRequest = buildStateKey(null, key, etag, null); adapter.getState(STATE_STORE_NAME, keyRequest, String.class); // block() on the mono above is not called, so nothing should happen. - assertFalse(callback.wasCalled); + assertFalse(called.get()); } @Test @@ -896,14 +923,16 @@ public class DaprClientGrpcTest { .setEtag(etag) .build(); State keyRequest = buildStateKey(null, key, etag, new HashMap<>(), options); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); - settableFuture.set(responseEnvelope); State res = result.block(); + assertNotNull(res); assertEquals(expectedState, res); } @@ -924,13 +953,14 @@ public class DaprClientGrpcTest { GetStateRequestBuilder builder = new GetStateRequestBuilder(STATE_STORE_NAME, key); builder.withMetadata(metadata).withEtag(etag).withStateOptions(options); GetStateRequest request = builder.build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + Mono>> result = adapter.getState(request, TypeRef.get(MyObject.class)); - settableFuture.set(responseEnvelope); Response> res = result.block(); assertNotNull(res); assertEquals(expectedState, res.getObject()); @@ -948,13 +978,15 @@ public class DaprClientGrpcTest { .setEtag(etag) .build(); State keyRequest = buildStateKey(null, key, etag, new HashMap<>(), options); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getState(any(io.dapr.v1.DaprProtos.GetStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getState(any(DaprProtos.GetStateRequest.class), any()); + Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); - settableFuture.set(responseEnvelope); + assertEquals(expectedState, result.block()); } @@ -1003,16 +1035,14 @@ public class DaprClientGrpcTest { .setError("not found") .build()) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); - assertTrue(callback.wasCalled); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1042,16 +1072,13 @@ public class DaprClientGrpcTest { .setError("not found") .build()) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block(); - assertTrue(callback.wasCalled); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1081,16 +1108,14 @@ public class DaprClientGrpcTest { .setError("not found") .build()) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block(); - assertTrue(callback.wasCalled); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1119,16 +1144,14 @@ public class DaprClientGrpcTest { .setError("not found") .build()) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), byte[].class).block(); - assertTrue(callback.wasCalled); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1156,16 +1179,14 @@ public class DaprClientGrpcTest { .setError("not found") .build()) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block(); - assertTrue(callback.wasCalled); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1181,7 +1202,10 @@ public class DaprClientGrpcTest { @Test public void deleteStateExceptionThrowTest() { - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))).thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State key = buildStateKey(null, "Key1", "ETag1", null); Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); @@ -1215,15 +1239,15 @@ public class DaprClientGrpcTest { @Test public void deleteStateCallbackExcpetionThrownTest() { - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State key = buildStateKey(null, "Key1", "ETag1", null); Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -1236,17 +1260,17 @@ public class DaprClientGrpcTest { public void deleteStateNoOptionsTest() { String etag = "ETag1"; String key = "key1"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State stateKey = buildStateKey(null, key, etag, null); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1254,17 +1278,17 @@ public class DaprClientGrpcTest { String etag = "ETag1"; String key = "key1"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1274,38 +1298,39 @@ public class DaprClientGrpcTest { StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); Map metadata = new HashMap<>(); metadata.put("key_1", "val_1"); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + DeleteStateRequestBuilder builder = new DeleteStateRequestBuilder(STATE_STORE_NAME, key); builder.withEtag(etag).withStateOptions(options).withMetadata(metadata); DeleteStateRequest request = builder.build(); Mono> result = adapter.deleteState(request); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test public void deleteStateTestNoHotMono() { + AtomicBoolean called = new AtomicBoolean(false); String etag = "ETag1"; String key = "key1"; - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(Empty.newBuilder().build()); - return settableFuture; - }); + StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE); + doAnswer((Answer) invocation -> { + called.set(true); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), - stateKey.getOptions()); + adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); // Do not call result.block(), so nothing should happen. - assertFalse(callback.wasCalled); + assertFalse(called.get()); } @Test @@ -1313,17 +1338,17 @@ public class DaprClientGrpcTest { String etag = "ETag1"; String key = "key1"; StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1331,17 +1356,17 @@ public class DaprClientGrpcTest { String etag = "ETag1"; String key = "key1"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(DaprProtos.DeleteStateRequest.class), any()); + State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1368,9 +1393,14 @@ public class DaprClientGrpcTest { String key = "key1"; String data = "my data"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - SettableFuture settableFuture = SettableFuture.create(); - when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + + when(mockSerializer.serialize(any())).thenThrow(IOException.class); State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( @@ -1394,11 +1424,13 @@ public class DaprClientGrpcTest { String key = "key1"; String data = "my data"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, @@ -1413,9 +1445,7 @@ public class DaprClientGrpcTest { .withMetadata(metadata) .build(); Mono> result = adapter.executeStateTransaction(request); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1424,11 +1454,13 @@ public class DaprClientGrpcTest { String key = "key1"; String data = "my data"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, @@ -1438,9 +1470,7 @@ public class DaprClientGrpcTest { new State<>("testKey") ); Mono result = adapter.executeStateTransaction(STATE_STORE_NAME, Arrays.asList(upsertOperation, deleteOperation)); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1449,8 +1479,10 @@ public class DaprClientGrpcTest { String key = "key1"; String data = "my data"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class))) - .thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation operation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, @@ -1470,18 +1502,18 @@ public class DaprClientGrpcTest { String key = "key1"; String data = "my data"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("ex"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class))) - .thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); + State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation operation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, stateKey); Mono result = adapter.executeStateTransaction(STATE_STORE_NAME, Collections.singletonList(operation)); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -1507,7 +1539,10 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenThrow(RuntimeException.class); + doAnswer((Answer) invocation -> { + throw new RuntimeException(); + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); assertThrowsDaprException( @@ -1522,13 +1557,14 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); RuntimeException ex = new RuntimeException("An Exception"); - MockCallback callback = new MockCallback<>(ex); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(ex); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); - settableFuture.setException(ex); assertThrowsDaprException( ExecutionException.class, @@ -1542,14 +1578,16 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1557,33 +1595,36 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test public void saveStateTestNoHotMono() { + AtomicBoolean called = new AtomicBoolean(false); String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenAnswer(c -> { - settableFuture.set(Empty.newBuilder().build()); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + called.set(true); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); - Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + adapter.saveState(STATE_STORE_NAME, key, etag, value, options); // No call to result.block(), so nothing should happen. - assertFalse(callback.wasCalled); + assertFalse(called.get()); } @Test @@ -1591,15 +1632,16 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1607,15 +1649,16 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } @Test @@ -1623,15 +1666,16 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); - settableFuture.set(Empty.newBuilder().build()); result.block(); - assertTrue(callback.wasCalled); } private State buildStateKey(T value, String key, String etag, StateOptions options) { @@ -1671,11 +1715,23 @@ public class DaprClientGrpcTest { String expectedValue2 = "Expected state 2"; State expectedState1 = buildStateKey(expectedValue1, key1, etag, new HashMap<>(), null); State expectedState2 = buildStateKey(expectedValue2, key2, etag, new HashMap<>(), null); - Map> futuresMap = new HashMap<>(); + Map futuresMap = new HashMap<>(); futuresMap.put(key1, buildFutureGetStateEnvelop(expectedValue1, etag)); futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag)); - when(client.getState(argThat(new GetStateRequestKeyMatcher(key1)))).thenReturn(futuresMap.get(key1)); - when(client.getState(argThat(new GetStateRequestKeyMatcher(key2)))).thenReturn(futuresMap.get(key2)); + + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(futuresMap.get(key1)); + observer.onCompleted(); + return null; + }).when(client).getState(argThat(new GetStateRequestKeyMatcher(key1)), any()); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(futuresMap.get(key2)); + observer.onCompleted(); + return null; + }).when(client).getState(argThat(new GetStateRequestKeyMatcher(key2)), any()); + State keyRequest1 = buildStateKey(null, key1, etag, null); Mono> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class); assertEquals(expectedState1, resultGet1.block()); @@ -1683,16 +1739,16 @@ public class DaprClientGrpcTest { Mono> resultGet2 = adapter.getState(STATE_STORE_NAME, keyRequest2, String.class); assertEquals(expectedState2, resultGet2.block()); - SettableFuture settableFutureDelete = SettableFuture.create(); - MockCallback callbackDelete = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFutureDelete, callbackDelete, directExecutor()); - when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))) - .thenReturn(settableFutureDelete); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class), any()); + Mono resultDelete = adapter.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(), keyRequest2.getOptions()); - settableFutureDelete.set(Empty.newBuilder().build()); resultDelete.block(); - assertTrue(callbackDelete.wasCalled); } @Test @@ -1701,16 +1757,18 @@ public class DaprClientGrpcTest { String key1 = "key1"; String expectedValue1 = "Expected state 1"; State expectedState1 = buildStateKey(expectedValue1, key1, etag, new HashMap<>(), null); - Map> futuresMap = new HashMap<>(); + Map futuresMap = new HashMap<>(); DaprProtos.GetStateResponse envelope = DaprProtos.GetStateResponse.newBuilder() .setData(serialize(expectedValue1)) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(envelope); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(envelope); - futuresMap.put(key1, settableFuture); - when(client.getState(argThat(new GetStateRequestKeyMatcher(key1)))).thenReturn(futuresMap.get(key1)); + futuresMap.put(key1, envelope); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(futuresMap.get(key1)); + observer.onCompleted(); + return null; + }).when(client).getState(argThat(new GetStateRequestKeyMatcher(key1)), any()); + State keyRequest1 = buildStateKey(null, key1, null, null); Mono> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class); assertEquals(expectedState1, resultGet1.block()); @@ -1729,16 +1787,14 @@ public class DaprClientGrpcTest { .setError("not found") .build()) .build(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class))) - .thenAnswer(c -> { - settableFuture.set(responseEnvelope); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getBulkState(any(DaprProtos.GetBulkStateRequest.class), any()); + List> result = adapter.getBulkState(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block(); - assertTrue(callback.wasCalled); assertEquals(2, result.size()); assertEquals("100", result.stream().findFirst().get().getKey()); @@ -1756,19 +1812,18 @@ public class DaprClientGrpcTest { String expectedKey = "attributeKey"; String expectedValue = "Expected secret value"; DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(expectedKey, expectedValue); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(responseEnvelope); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) - .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); - assertEquals("key", req.getKey()); - assertEquals(SECRET_STORE_NAME, req.getStoreName()); - assertEquals(0, req.getMetadataCount()); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + DaprProtos.GetSecretRequest req = invocation.getArgument(0); + assertEquals("key", req.getKey()); + assertEquals(SECRET_STORE_NAME, req.getStoreName()); + assertEquals(0, req.getMetadataCount()); + + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); Map result = adapter.getSecret(SECRET_STORE_NAME, "key").block(); @@ -1779,19 +1834,18 @@ public class DaprClientGrpcTest { @Test public void getSecretsEmptyResponse() { DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(responseEnvelope); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) - .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); - assertEquals("key", req.getKey()); - assertEquals(SECRET_STORE_NAME, req.getStoreName()); - assertEquals(0, req.getMetadataCount()); - return settableFuture; - }); + doAnswer((Answer) invocation -> { + DaprProtos.GetSecretRequest req = invocation.getArgument(0); + assertEquals("key", req.getKey()); + assertEquals(SECRET_STORE_NAME, req.getStoreName()); + assertEquals(0, req.getMetadataCount()); + + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); Map result = adapter.getSecret(SECRET_STORE_NAME, "key").block(); @@ -1800,19 +1854,16 @@ public class DaprClientGrpcTest { @Test public void getSecretsException() { - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(new RuntimeException()); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.setException(new RuntimeException()); + doAnswer((Answer) invocation -> { + DaprProtos.GetSecretRequest req = invocation.getArgument(0); + assertEquals("key", req.getKey()); + assertEquals(SECRET_STORE_NAME, req.getStoreName()); + assertEquals(0, req.getMetadataCount()); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) - .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); - assertEquals("key", req.getKey()); - assertEquals(SECRET_STORE_NAME, req.getStoreName()); - assertEquals(0, req.getMetadataCount()); - return settableFuture; - }); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onError(new RuntimeException()); + return null; + }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); assertThrowsDaprException(ExecutionException.class, () -> adapter.getSecret(SECRET_STORE_NAME, "key").block()); } @@ -1842,19 +1893,17 @@ public class DaprClientGrpcTest { String expectedKey = "attributeKey"; String expectedValue = "Expected secret value"; DaprProtos.GetSecretResponse responseEnvelope = buildGetSecretResponse(expectedKey, expectedValue); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(responseEnvelope); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(responseEnvelope); + doAnswer((Answer) invocation -> { + DaprProtos.GetSecretRequest req = invocation.getArgument(0); + assertEquals("key", req.getKey()); + assertEquals(SECRET_STORE_NAME, req.getStoreName()); + assertEquals("metavalue", req.getMetadataMap().get("metakey")); - when(client.getSecret(any(io.dapr.v1.DaprProtos.GetSecretRequest.class))) - .thenAnswer(context -> { - io.dapr.v1.DaprProtos.GetSecretRequest req = context.getArgument(0); - assertEquals("key", req.getKey()); - assertEquals(SECRET_STORE_NAME, req.getStoreName()); - assertEquals("metavalue", req.getMetadataMap().get("metakey")); - return settableFuture; - }); + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(client).getSecret(any(DaprProtos.GetSecretRequest.class), any()); Map result = adapter.getSecret( SECRET_STORE_NAME, @@ -1873,18 +1922,18 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); - settableFuture.set(Empty.newBuilder().build()); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + for (StateOptions.Consistency consistency : StateOptions.Consistency.values()) { StateOptions options = buildStateOptions(consistency, StateOptions.Concurrency.FIRST_WRITE); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } - - assertTrue(callback.wasCalled); } /* If this test is failing, it means that a new value was added to StateOptions.Concurrency @@ -1895,28 +1944,22 @@ public class DaprClientGrpcTest { String key = "key1"; String etag = "ETag1"; String value = "State value"; - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); - addCallback(settableFuture, callback, directExecutor()); - when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture); - settableFuture.set(Empty.newBuilder().build()); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(client).saveState(any(DaprProtos.SaveStateRequest.class), any()); + for (StateOptions.Concurrency concurrency : StateOptions.Concurrency.values()) { StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, concurrency); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); result.block(); } - - assertTrue(callback.wasCalled); } - private SettableFuture buildFutureGetStateEnvelop(T value, String etag) throws IOException { - DaprProtos.GetStateResponse envelope = buildGetStateResponse(value, etag); - SettableFuture settableFuture = SettableFuture.create(); - MockCallback callback = new MockCallback<>(envelope); - addCallback(settableFuture, callback, directExecutor()); - settableFuture.set(envelope); - - return settableFuture; + private DaprProtos.GetStateResponse buildFutureGetStateEnvelop(T value, String etag) throws IOException { + return buildGetStateResponse(value, etag); } private DaprProtos.GetStateResponse buildGetStateResponse(T value, String etag) throws IOException { @@ -1953,37 +1996,6 @@ public class DaprClientGrpcTest { return ByteString.copyFrom(byteValue); } - private static final class MockCallback implements FutureCallback { - private T value = null; - private Throwable failure = null; - private boolean wasCalled = false; - - public MockCallback(T expectedValue) { - this.value = expectedValue; - } - - public MockCallback(Throwable expectedFailure) { - this.failure = expectedFailure; - } - - @Override - public synchronized void onSuccess(@NullableDecl T result) { - assertFalse(wasCalled); - wasCalled = true; - assertEquals(value, result); - } - - @Override - public synchronized void onFailure(Throwable throwable) { - assertFalse(wasCalled); - wasCalled = true; - assertEquals(failure == null, throwable == null); - if (failure != null) { - assertEquals(failure.getClass(), throwable.getClass()); - } - } - } - public static class MyObject { private Integer id; private String value;