diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index e08799d9c..4454edf7d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -287,8 +287,7 @@ public class DaprClientGrpc implements DaprClient { } DaprProtos.SaveStateEnvelope envelope = builder.build(); - ListenableFuture futureEmpty = client.saveState(envelope); - return Mono.just(futureEmpty).flatMap(f -> { + return Mono.fromCallable(() -> client.saveState(envelope)).flatMap(f -> { try { f.get(); } catch (Exception ex) { @@ -432,8 +431,7 @@ public class DaprClientGrpc implements DaprClient { } DaprProtos.DeleteStateEnvelope envelope = builder.build(); - ListenableFuture futureEmpty = client.deleteState(envelope); - return Mono.just(futureEmpty).flatMap(f -> { + return Mono.fromCallable(() -> client.deleteState(envelope)).flatMap(f -> { try { f.get(); } catch (Exception ex) { diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 3a7e371e9..1648f4304 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -221,7 +221,7 @@ public class DaprHttp { * @return DaprError or null if could not parse. */ private static DaprError parseDaprError(byte[] json) throws IOException { - if (json == null) { + if ((json == null) || (json.length == 0)) { return null; } return OBJECT_MAPPER.readValue(json, DaprError.class); diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 0c478f167..b6dd5aa02 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -82,6 +82,21 @@ public class DaprClientGrpcTest { assertTrue(callback.wasCalled); } + @Test + public void publishEventNoHotMono() { + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class))) + .thenAnswer(c -> { + settableFuture.set(Empty.newBuilder().build()); + return settableFuture; + }); + adapter.publishEvent("topic", "object"); + // Do not call block() on the mono above, so nothing should happen. + assertFalse(callback.wasCalled); + } + @Test public void publishEventObjectTest() { SettableFuture settableFuture = SettableFuture.create(); @@ -145,6 +160,22 @@ public class DaprClientGrpcTest { assertTrue(callback.wasCalled); } + @Test + public void invokeBindingObjectNoHotMono() { + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class))) + .thenAnswer(c -> { + settableFuture.set(Empty.newBuilder().build()); + return settableFuture; + }); + MyObject event = new MyObject(1, "Event"); + adapter.invokeBinding("BindingName", event); + // Do not call block() on mono above, so nothing should happen. + assertFalse(callback.wasCalled); + } + @Test(expected = RuntimeException.class) public void invokeServiceVoidExceptionThrownTest() { when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class))) @@ -415,6 +446,24 @@ public class DaprClientGrpcTest { assertTrue(callback.wasCalled); } + @Test + public void invokeServiceNoRequestNoHotMono() throws Exception { + String expected = "Value"; + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = + new MockCallback(DaprProtos.InvokeServiceResponseEnvelope.newBuilder() + .setData(getAny(expected)).build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class))) + .thenAnswer(c -> { + settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(expected)).build()); + return settableFuture; + }); + adapter.invokeService(Verb.GET, "appId", "method", null); + // Do not call block() on mono above, so nothing should happen. + assertFalse(callback.wasCalled); + } + @Test public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); @@ -466,13 +515,34 @@ public class DaprClientGrpcTest { MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) - .thenReturn(settableFuture); + .thenReturn(settableFuture); State keyRequest = buildStateKey(null, key, etag, null); Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class); settableFuture.set(responseEnvelope); assertEquals(expectedState, result.block()); } + @Test + public void getStateStringValueNoHotMono() throws IOException { + String etag = "ETag1"; + String key = "key1"; + String expectedValue = "Expected state"; + State expectedState = buildStateKey(expectedValue, key, etag, null); + DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(responseEnvelope); + addCallback(settableFuture, callback, directExecutor()); + when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) + .thenAnswer(c -> { + settableFuture.set(responseEnvelope); + return settableFuture; + }); + State keyRequest = buildStateKey(null, key, etag, null); + adapter.getState(STATE_STORE_NAME, keyRequest, String.class); + // block() on the mono above is not called, so nothing should happen. + assertFalse(callback.wasCalled); + } + @Test public void getStateObjectValueWithOptionsTest() throws IOException { String etag = "ETag1"; @@ -564,20 +634,41 @@ public class DaprClientGrpcTest { String etag = "ETag1"; String key = "key1"; StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) - .thenReturn(settableFuture); + .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), - stateKey.getOptions()); + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); } + @Test + public void deleteStateTestNoHotMono() { + String etag = "ETag1"; + String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) + .thenAnswer(c -> { + settableFuture.set(Empty.newBuilder().build()); + return settableFuture; + }); + State stateKey = buildStateKey(null, key, etag, options); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); + // Do not call result.block(), so nothing should happen. + assertFalse(callback.wasCalled); + } + @Test public void deleteStateNoConsistencyTest() { String etag = "ETag1"; @@ -742,13 +833,32 @@ public class DaprClientGrpcTest { addCallback(settableFuture, callback, directExecutor()); when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); } + @Test + public void saveStateTestNoHotMono() { + String key = "key1"; + String etag = "ETag1"; + String value = "State value"; + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); + addCallback(settableFuture, callback, directExecutor()); + when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenAnswer(c -> { + settableFuture.set(Empty.newBuilder().build()); + return settableFuture; + }); + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); + // No call to result.block(), so nothing should happen. + assertFalse(callback.wasCalled); + } + @Test public void saveStateNoConsistencyTest() { String key = "key1"; diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 013b71460..d8944d09f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -77,6 +77,18 @@ public class DaprClientHttpTest { assertNull(mono.block()); } + @Test + public void publishEventNoHotMono() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/publish/A") + .respond(EXPECTED_RESULT); + String event = "{ \"message\": \"This is a test\" }"; + daprHttp = new DaprHttp(3000, okHttpClient); + daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.publishEvent("", event); + // Should not throw exception because did not call block() on mono above. + } + @Test(expected = IllegalArgumentException.class) public void invokeServiceVerbNull() { mockInterceptor.addRule() @@ -187,6 +199,18 @@ public class DaprClientHttpTest { assertNull(mono.block()); } + @Test + public void invokeServiceNoHotMono() { + Map map = new HashMap<>(); + mockInterceptor.addRule() + .get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") + .respond(500); + daprHttp = new DaprHttp(3000, okHttpClient); + daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.invokeService(Verb.GET, "41", "neworder", "", map); + // No exception should be thrown because did not call block() on mono above. + } + @Test public void invokeBinding() { Map map = new HashMap<>(); @@ -211,6 +235,17 @@ public class DaprClientHttpTest { assertNull(mono.block()); } + @Test + public void bindingNoHotMono() { + Map map = new HashMap<>(); + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") + .respond(EXPECTED_RESULT); + daprHttp = new DaprHttp(3000, okHttpClient); + daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.invokeBinding(null, ""); + // No exception is thrown because did not call block() on mono above. + } @Test public void getStates() { @@ -253,6 +288,18 @@ public class DaprClientHttpTest { assertEquals(monoNullEtag.block().getKey(), "key"); } + @Test + public void getStatesNoHotMono() { + State stateNullEtag = new State("value", "key", null, null); + mockInterceptor.addRule() + .get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") + .respond(500); + daprHttp = new DaprHttp(3000, okHttpClient); + daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class); + // No exception should be thrown since did not call block() on mono above. + } + @Test public void saveStates() { State stateKeyValue = new State("value", "key", "etag", null); @@ -319,6 +366,17 @@ public class DaprClientHttpTest { assertNull(mono.block()); } + @Test + public void saveStatesNoHotMono() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/state/MyStateStore") + .respond(500); + StateOptions stateOptions = mock(StateOptions.class); + daprHttp = new DaprHttp(3000, okHttpClient); + daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions); + // No exception should be thrown because we did not call block() on the mono above. + } @Test public void deleteState() { @@ -333,6 +391,19 @@ public class DaprClientHttpTest { assertNull(mono.block()); } + @Test + public void deleteStateNoHotMono() { + StateOptions stateOptions = mock(StateOptions.class); + State stateKeyValue = new State("value", "key", "etag", stateOptions); + mockInterceptor.addRule() + .delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key") + .respond(500); + daprHttp = new DaprHttp(3000, okHttpClient); + daprClientHttp = new DaprClientHttp(daprHttp); + daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions); + // No exception should be thrown because we did not call block() on the mono above. + } + @Test public void deleteStateNullEtag() { State stateKeyValue = new State("value", "key", null, null);