From c67d5678d0d4f77d2347f1b083c12758e766f129 Mon Sep 17 00:00:00 2001 From: Andres Robles <15348598+AndresRoblesMX@users.noreply.github.com> Date: Thu, 16 Jan 2020 16:34:39 -0600 Subject: [PATCH] Fixing bug, where call should be deferred instead of async (#102) * Fixing bug, where call should be deferred instead of async * Removing unused pool property Returning null as response body if empty response is found. Adding Test case for validating callback is executed when expected. * Fixing assertion based on previous changes * Adding test case to verify that the call to grpc only happens when the requestor block the thread, instead of in parallel immediately after calling the methods in the adapter. Documenting complex test case * Fixing merge conflicts * Documenting complex test case --- .../io/dapr/client/DaprClientGrpcAdapter.java | 79 +++++------- .../main/java/io/dapr/client/DaprHttp.java | 12 +- .../client/DaprClientGrpcAdapterTest.java | 122 ++++++++++++++++-- .../java/io/dapr/client/DaprHttpTest.java | 60 ++++++++- 4 files changed, 203 insertions(+), 70 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java index 8e2d27a99..13326eeac 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java @@ -71,14 +71,11 @@ class DaprClientGrpcAdapter implements DaprClient { DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.newBuilder() .setTopic(topic).setData(data).build(); - ListenableFuture futureEmpty = client.publishEvent(envelope); - return Mono.just(futureEmpty).flatMap(f -> { - try { - f.get(); - } catch (Exception ex) { - return Mono.error(ex); - } - return Mono.empty(); + + return Mono.fromCallable(() -> { + ListenableFuture futureEmpty = client.publishEvent(envelope); + futureEmpty.get(); + return null; }); } catch (Exception ex) { return Mono.error(ex); @@ -92,16 +89,11 @@ class DaprClientGrpcAdapter implements DaprClient { public Mono invokeService(Verb verb, String appId, String method, R request, Map metadata, Class clazz) { try { DaprProtos.InvokeServiceEnvelope envelope = buildInvokeServiceEnvelope(verb.toString(), appId, method, request); - ListenableFuture futureResponse = - client.invokeService(envelope); - return Mono.just(futureResponse).flatMap(f -> { - try { - return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toByteArray(), clazz)); - } catch (Exception ex) { - return Mono.error(ex); - } + return Mono.fromCallable(() -> { + ListenableFuture futureResponse = + client.invokeService(envelope); + return objectSerializer.deserialize(futureResponse.get().getData().getValue().toByteArray(), clazz); }); - } catch (Exception ex) { return Mono.error(ex); } @@ -151,14 +143,10 @@ class DaprClientGrpcAdapter implements DaprClient { .setName(name) .setData(data); DaprProtos.InvokeBindingEnvelope envelope = builder.build(); - ListenableFuture futureEmpty = client.invokeBinding(envelope); - return Mono.just(futureEmpty).flatMap(f -> { - try { - f.get(); - } catch (Exception ex) { - return Mono.error(ex); - } - return Mono.empty(); + return Mono.fromCallable(() -> { + ListenableFuture futureEmpty = client.invokeBinding(envelope); + futureEmpty.get(); + return null; }); } catch (Exception ex) { return Mono.error(ex); @@ -167,7 +155,7 @@ class DaprClientGrpcAdapter implements DaprClient { /** * @return Returns an io.dapr.client.domain.StateKeyValue - * + *

* {@inheritDoc} */ @Override @@ -180,22 +168,23 @@ class DaprClientGrpcAdapter implements DaprClient { } DaprProtos.GetStateEnvelope envelope = builder.build(); - ListenableFuture futureResponse = client.getState(envelope); - return Mono.just(futureResponse).flatMap(f -> { + return Mono.fromCallable(() -> { + ListenableFuture futureResponse = client.getState(envelope); + DaprProtos.GetStateResponseEnvelope response = null; try { - return Mono.just(buildStateKeyValue(f.get(), state.getKey(), stateOptions, clazz)); - } catch (Exception ex) { - return Mono.error(ex); + response = futureResponse.get(); + } catch (NullPointerException npe) { + return null; } - }); - } catch (Exception ex) { + return buildStateKeyValue(response, state.getKey(), stateOptions, clazz); + }); } catch (Exception ex) { return Mono.error(ex); } } - private StateKeyValue buildStateKeyValue(DaprProtos.GetStateResponseEnvelope resonse, String requestedKey, StateOptions stateOptions, Class clazz) throws IOException { - T value = objectSerializer.deserialize(resonse.getData().getValue().toByteArray(), clazz); - String etag = resonse.getEtag(); + private StateKeyValue buildStateKeyValue(DaprProtos.GetStateResponseEnvelope response, String requestedKey, StateOptions stateOptions, Class clazz) throws IOException { + T value = objectSerializer.deserialize(Optional.ofNullable(response.getData().getValue().toByteArray()).orElse(null), clazz); + String etag = response.getEtag(); String key = requestedKey; return new StateKeyValue<>(value, key, etag, stateOptions); } @@ -208,8 +197,7 @@ class DaprClientGrpcAdapter implements DaprClient { try { DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder(); for (StateKeyValue state : states) { - builder.addRequests(buildStateRequest(state).build()); - } + builder.addRequests(buildStateRequest(state).build()); } DaprProtos.SaveStateEnvelope envelope = builder.build(); ListenableFuture futureEmpty = client.saveState(envelope); @@ -282,7 +270,7 @@ class DaprClientGrpcAdapter implements DaprClient { @Override public Mono deleteState(StateKeyValue state, StateOptions options) { try { - DaprProtos.StateOptions.Builder optionBuilder = null; + DaprProtos.StateOptions.Builder optionBuilder = null; if (options != null) { optionBuilder = DaprProtos.StateOptions.newBuilder(); @@ -407,12 +395,13 @@ class DaprClientGrpcAdapter implements DaprClient { /** * Builds the object io.dapr.{@link DaprProtos.InvokeServiceEnvelope} to be send based on the parameters. - * @param verb String that must match HTTP Methods - * @param appId The application id to be invoked - * @param method The application method to be invoked - * @param request The body of the request to be send as part of the invokation - * @param The Type of the Body - * @return The object to be sent as part of the invokation. + * + * @param verb String that must match HTTP Methods + * @param appId The application id to be invoked + * @param method The application method to be invoked + * @param request The body of the request to be send as part of the invokation + * @param The Type of the Body + * @return The object to be sent as part of the invokation. * @throws IOException If there's an issue serializing the request. */ private DaprProtos.InvokeServiceEnvelope buildInvokeServiceEnvelope( diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 82736f64f..aa49a743c 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -83,11 +83,6 @@ class DaprHttp { */ private final OkHttpClient httpClient; - /** - * Thread-pool for HTTP calls. - */ - private final ExecutorService pool; - /** * Creates a new instance of {@link DaprHttp}. * @@ -97,7 +92,6 @@ class DaprHttp { DaprHttp(int port, OkHttpClient httpClient) { this.port = port; this.httpClient = httpClient; - this.pool = Executors.newWorkStealingPool(); } /** @@ -132,7 +126,7 @@ class DaprHttp { * @return Asynchronous text */ public Mono invokeAPI(String method, String urlString, Map urlParameters, byte[] content, Map headers) { - return Mono.fromFuture(CompletableFuture.supplyAsync( + return Mono.fromCallable( () -> { try { String requestId = UUID.randomUUID().toString(); @@ -185,12 +179,12 @@ class DaprHttp { response.headers().forEach(pair -> { mapHeaders.put(pair.getFirst(), pair.getSecond()); }); - return new Response(result, mapHeaders, response.code()); + return new Response(result.length > 0 ? result : null, mapHeaders, response.code()); } } catch (Exception e) { throw new RuntimeException(e); } - }, this.pool)); + }); } /** diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java index dd83cb318..e36d8442f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java @@ -14,12 +14,15 @@ import io.dapr.utils.ObjectSerializer; import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; import reactor.core.publisher.Mono; import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import static com.google.common.util.concurrent.Futures.addCallback; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; @@ -42,13 +45,13 @@ public class DaprClientGrpcAdapterTest { @Test(expected = UnsupportedOperationException.class) public void unregisterActorTimerTest() { - Mono result = adater.unregisterActorTimer("actorType", "actorId", "timerName"); + Mono result = adater.unregisterActorTimer("actorType", "actorId", "timerName"); result.block(); } @Test(expected = UnsupportedOperationException.class) public void registerActorTimerTest() { - Mono result = adater.registerActorTimer("actorType", "actorId", "timerName" , "DATA"); + Mono result = adater.registerActorTimer("actorType", "actorId", "timerName", "DATA"); result.block(); } @@ -273,7 +276,7 @@ public class DaprClientGrpcAdapterTest { } @Test - public void invokeServiceObjectTest() throws Exception { + public void invokeServiceObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = @@ -322,13 +325,13 @@ public class DaprClientGrpcAdapterTest { settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(expected)).build()); when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class))) .thenReturn(settableFuture); - Mono result = adater.invokeService(Verb.GET, "appId", "method",null, String.class); + Mono result = adater.invokeService(Verb.GET, "appId", "method", null, String.class); String strOutput = result.block(); assertEquals(expected, strOutput); } @Test - public void invokeServiceNoRequestBodyObjectTest() throws Exception { + public void invokeServiceNoRequestBodyObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); SettableFuture settableFuture = SettableFuture.create(); @@ -339,7 +342,7 @@ public class DaprClientGrpcAdapterTest { settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(resultObj)).build()); when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class))) .thenReturn(settableFuture); - Mono result = adater.invokeService(Verb.GET, "appId", "method",null, String.class); + Mono result = adater.invokeService(Verb.GET, "appId", "method", null, String.class); String strOutput = result.block(); assertEquals(serializer.serializeString(resultObj), strOutput); } @@ -390,7 +393,7 @@ public class DaprClientGrpcAdapterTest { } @Test - public void invokeServiceByteRequestObjectTest() throws Exception { + public void invokeServiceByteRequestObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = @@ -446,7 +449,7 @@ public class DaprClientGrpcAdapterTest { } @Test - public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception { + public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception { MyObject resultObj = new MyObject(1, "Value"); SettableFuture settableFuture = SettableFuture.create(); @@ -491,10 +494,7 @@ public class DaprClientGrpcAdapterTest { String key = "key1"; String expectedValue = "Expected state"; StateKeyValue expectedState = buildStateKey(expectedValue, key, etag, null); - DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() - .setData(getAny(expectedValue)) - .setEtag(etag) - .build(); + DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); @@ -880,6 +880,76 @@ public class DaprClientGrpcAdapterTest { return new StateKeyValue(value, key, etag, options); } + /** + * The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR + * will be done when the output Mono response call the Mono.block method. + * Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state + * you just retrived but block for the delete response, when later you block for the response of the getState, you will + * not found the state. + *

This test will execute the following flow:

+ *
    + *
  1. Exeucte client getState for Key=key1
  2. + *
  3. Block for result to the the state
  4. + *
  5. Assert the Returned State is the expected to key1
  6. + *
  7. Execute client getState for Key=key2
  8. + *
  9. Execute client deleteState for Key=key2
  10. + *
  11. Block for deleteState call.
  12. + *
  13. Block for getState for Key=key2 and Assert they 2 was not found.
  14. + *
+ * @throws Exception + */ + + @Test + public void getStateDeleteStateThenBlockDeleteThenBlockGet() throws Exception { + String etag = "ETag1"; + String key1 = "key1"; + String expectedValue1 = "Expected state 1"; + String key2 = "key2"; + String expectedValue2 = "Expected state 2"; + StateKeyValue expectedState1 = buildStateKey(expectedValue1, key1, etag, null); + Map> futuresMap = new HashMap<>(); + futuresMap.put(key1, buildFutureGetStateEnvelop(expectedValue1, etag)); + futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag)); + when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key1)))).thenReturn(futuresMap.get(key1)); + StateKeyValue keyRequest1 = buildStateKey(null, key1, etag, null); + Mono> resultGet1 = adater.getState(keyRequest1, null, String.class); + assertEquals(expectedState1, resultGet1.block()); + StateKeyValue keyRequest2 = buildStateKey(null, key2, etag, null); + Mono> resultGet2 = adater.getState(keyRequest2, null, String.class); + + SettableFuture settableFutureDelete = SettableFuture.create(); + MockCallback callbackDelete = new MockCallback<>(Empty.newBuilder().build()); + addCallback(settableFutureDelete, callbackDelete, directExecutor()); + when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) + .thenReturn(settableFutureDelete); + Mono resultDelete = adater.deleteState(keyRequest2, null); + settableFutureDelete.set(Empty.newBuilder().build()); + resultDelete.block(); + assertTrue(callbackDelete.wasCalled); + futuresMap.replace(key2, null); + when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key2)))).thenReturn(futuresMap.get(key2)); + + StateKeyValue state2 = resultGet2.block(); + assertNull(state2); + } + + private SettableFuture buildFutureGetStateEnvelop(T value, String etag) throws IOException { + DaprProtos.GetStateResponseEnvelope envelope = buildGetStateResponseEnvelope(value, etag); + SettableFuture settableFuture = SettableFuture.create(); + MockCallback callback = new MockCallback<>(envelope); + addCallback(settableFuture, callback, directExecutor()); + settableFuture.set(envelope); + + return settableFuture; + } + + private DaprProtos.GetStateResponseEnvelope buildGetStateResponseEnvelope(T value, String etag) throws IOException { + return DaprProtos.GetStateResponseEnvelope.newBuilder() + .setData(getAny(value)) + .setEtag(etag) + .build(); + } + private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency, Duration interval, Integer threshold, StateOptions.RetryPolicy.Pattern pattern) { @@ -977,4 +1047,32 @@ public class DaprClientGrpcAdapterTest { return result; } } + + private static class GetStateEnvelopeKeyMatcher implements ArgumentMatcher { + + private final String propValue; + + GetStateEnvelopeKeyMatcher(String propValue) { + this.propValue = propValue; + } + + @Override + public boolean matches(DaprProtos.GetStateEnvelope argument) { + if (argument == null) { + return false; + } + if (propValue == null && argument.getKey() != null) { + return false; + } + if (propValue == null && argument.getKey() == null) { + return true; + } + return propValue.equals(argument.getKey()); + } + + @Override + public String toString() { + return ""; + } + } } diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java index 041c08a11..5b2a7d323 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java @@ -4,6 +4,8 @@ */ package io.dapr.client; +import io.dapr.exceptions.DaprException; +import io.dapr.utils.Constants; import io.dapr.utils.ObjectSerializer; import okhttp3.*; import okhttp3.mock.Behavior; @@ -16,7 +18,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class DaprHttpTest { @@ -177,16 +179,66 @@ public class DaprHttpTest { @Test public void getHeadersAndStatus(){ - mockInterceptor.addRule() .post("http://localhost:3500/v1.0/state") .respond(500, ResponseBody.create(MediaType.parse("application/json"), "{\"errorCode\":\"null\",\"message\":\"null\"}")); - DaprHttp daprHttp = new DaprHttp(3500, okHttpClient); - System.out.println(daprHttp); + } + /** + * The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR + * will be done when the output Mono response call the Mono.block method. + * Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state + * you just retrived but block for the delete response, when later you block for the response of the getState, you will + * not found the state. + *

This test will execute the following flow:

+ *
    + *
  1. Exeucte client getState for Key=key1
  2. + *
  3. Block for result to the the state
  4. + *
  5. Assert the Returned State is the expected to key1
  6. + *
  7. Execute client getState for Key=key2
  8. + *
  9. Execute client deleteState for Key=key2
  10. + *
  11. Block for deleteState call.
  12. + *
  13. Block for getState for Key=key2 and Assert they 2 was not found.
  14. + *
+ * @throws Exception + */ + @Test () + public void testCallbackCalledAtTheExpectedTimeTest() throws IOException { + String deletedStateKey = "deletedKey"; + String existingState = "existingState"; + String urlDeleteState = Constants.STATE_PATH + "/" + deletedStateKey; + String urlExistingState = Constants.STATE_PATH + "/" + existingState; + mockInterceptor.addRule() + .get("http://localhost:3500/" + urlDeleteState) + .respond(200, ResponseBody.create(MediaType.parse("application/json"), + deletedStateKey)); + mockInterceptor.addRule() + .delete("http://localhost:3500/" + urlDeleteState) + .respond(204); + mockInterceptor.addRule() + .get("http://localhost:3500/" +urlExistingState) + .respond(200, ResponseBody.create(MediaType.parse("application/json"), + existingState)); + DaprHttp daprHttp = new DaprHttp(3500, okHttpClient); + Mono response = daprHttp.invokeAPI("GET", urlExistingState, null, null); + assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class)); + Mono responseDeleted = daprHttp.invokeAPI("GET", urlDeleteState, null, null); + Mono responseDeleteKey = daprHttp.invokeAPI("DELETE", urlDeleteState, null, null); + assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class)); + mockInterceptor.reset(); + mockInterceptor.addRule() + .get("http://localhost:3500/" +urlDeleteState) + .respond(404, ResponseBody.create(MediaType.parse("application/json"), + "{\"errorCode\":\"404\",\"message\":\"State Not Fuund\"}")); + try { + responseDeleted.block(); + fail("Expected DaprException"); + } catch (Exception ex) { + assertEquals(DaprException.class, ex.getCause().getCause().getClass()); + } } }