From 81b47f85b3b29cdfe254751064863672e4441b2d Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Thu, 21 Jan 2021 15:23:24 -0800 Subject: [PATCH] Makes http call async and allows timeout in Mono. (#445) --- .../http/MethodInvokeController.java | 5 ++ .../it/methodinvoke/http/MethodInvokeIT.java | 17 +++++ .../main/java/io/dapr/client/DaprHttp.java | 70 +++++++++++++------ 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java index f68d65278..cbc5319f8 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java @@ -79,4 +79,9 @@ public class MethodInvokeController { public List getPersons() { return persons; } + + @PostMapping(path = "/sleep") + public void sleep(@RequestBody int seconds) throws InterruptedException { + Thread.sleep(seconds * 1000); + } } diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java index 7de86455d..df273c2ad 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java @@ -10,6 +10,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.time.Duration; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; @@ -18,6 +19,8 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.junit.runners.Parameterized.Parameter; import static org.junit.runners.Parameterized.Parameters; @@ -126,4 +129,18 @@ public class MethodInvokeIT extends BaseIT { assertEquals("Smith", resultPerson.getLastName()); } } + + @Test + public void testInvokeTimeout() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + long started = System.currentTimeMillis(); + String message = assertThrows(IllegalStateException.class, () -> { + client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST) + .block(Duration.ofMillis(10)); + }).getMessage(); + long delay = System.currentTimeMillis() - started; + assertTrue(delay <= 200); // 200 ms is a reasonable delay if the request timed out. + assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); + } + } } diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 5b91de2d0..a5f532a5c 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -14,12 +14,15 @@ import io.dapr.exceptions.DaprException; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.propagation.HttpTraceContext; import io.opentelemetry.context.Context; +import okhttp3.Call; +import okhttp3.Callback; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.ResponseBody; +import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import java.io.IOException; @@ -30,6 +33,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; public class DaprHttp implements AutoCloseable { @@ -212,7 +216,9 @@ public class DaprHttp implements AutoCloseable { byte[] content, Map headers, Context context) { - return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context)); + // fromCallable() is needed so the invocation does not happen early, causing a hot mono. + return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context)) + .flatMap(f -> Mono.fromFuture(f)); } /** @@ -233,13 +239,13 @@ public class DaprHttp implements AutoCloseable { * @param content payload to be posted. * @param headers HTTP headers. * @param context OpenTelemetry's Context. - * @return Response + * @return CompletableFuture for Response. */ - private Response doInvokeApi(String method, + private CompletableFuture doInvokeApi(String method, String[] pathSegments, Map urlParameters, byte[] content, Map headers, - Context context) throws IOException { + Context context) { final String requestId = UUID.randomUUID().toString(); RequestBody body; @@ -290,23 +296,10 @@ public class DaprHttp implements AutoCloseable { Request request = requestBuilder.build(); - try (okhttp3.Response response = this.httpClient.newCall(request).execute()) { - if (!response.isSuccessful()) { - DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response)); - if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) { - throw new DaprException(error); - } - throw new DaprException("UNKNOWN", "HTTP status code: " + response.code()); - } - - Map mapHeaders = new HashMap<>(); - byte[] result = getBodyBytesOrEmptyArray(response); - response.headers().forEach(pair -> { - mapHeaders.put(pair.getFirst(), pair.getSecond()); - }); - return new Response(result, mapHeaders, response.code()); - } + CompletableFuture future = new CompletableFuture<>(); + this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future)); + return future; } /** @@ -336,4 +329,41 @@ public class DaprHttp implements AutoCloseable { return EMPTY_BYTES; } + /** + * Converts the okhttp3 response into the response object expected internally by the SDK. + */ + private static class ResponseFutureCallback implements Callback { + private final CompletableFuture future; + + public ResponseFutureCallback(CompletableFuture future) { + this.future = future; + } + + @Override + public void onFailure(Call call, IOException e) { + future.completeExceptionally(e); + } + + @Override + public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException { + if (!response.isSuccessful()) { + DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response)); + if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) { + future.completeExceptionally(new DaprException(error)); + return; + } + + future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code())); + return; + } + + Map mapHeaders = new HashMap<>(); + byte[] result = getBodyBytesOrEmptyArray(response); + response.headers().forEach(pair -> { + mapHeaders.put(pair.getFirst(), pair.getSecond()); + }); + future.complete(new Response(result, mapHeaders, response.code())); + } + } + } \ No newline at end of file