diff --git a/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java index 2be14579b..f5a687350 100644 --- a/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java +++ b/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java @@ -1,11 +1,10 @@ package io.dapr.actors; - import com.fasterxml.jackson.databind.ObjectMapper; - import okhttp3.*; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.io.IOException; import java.net.URL; @@ -69,47 +68,36 @@ public abstract class AbstractDaprClient { * @return Asynchronous text */ protected final Mono invokeAPI(String method, String urlString, String json) { - return Mono.fromSupplier(() -> { - try { - return tryInvokeAPI(method, urlString, json); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); + String requestId = UUID.randomUUID().toString(); + RequestBody body = json != null ? RequestBody.create(json, MEDIA_TYPE_APPLICATION_JSON) : REQUEST_BODY_EMPTY_JSON; + + // use Mono blocking wrapper to be reactive + Mono blockingWrapper = Mono.fromCallable(() -> { + Request request = new Request.Builder() + .url(new URL(this.baseUrl + urlString)) + .method(method, body) + .addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId) + .build(); + + + try(Response response = this.httpClient.newCall(request).execute()) + { + if (!response.isSuccessful()) { + DaprError error = parseDaprError(response.body().string()); + if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) { + throw new DaprException(error); + } + + throw new DaprException("UNKNOWN", String.format("Dapr's Actor API %s failed with return code %d %s", urlString, response.code())); + } + + return response.body().string(); } }); + + return blockingWrapper.subscribeOn(Schedulers.boundedElastic()); } - /** - * Invokes an API synchronously and returns a text payload. - * @param method HTTP method. - * @param urlString url as String. - * @param json JSON payload or null. - * @return text - */ - protected final String tryInvokeAPI(String method, String urlString, String json) throws IOException { - String requestId = UUID.randomUUID().toString(); - RequestBody body = json != null ? RequestBody.create(MEDIA_TYPE_APPLICATION_JSON, json) : REQUEST_BODY_EMPTY_JSON; - - Request request = new Request.Builder() - .url(new URL(this.baseUrl + urlString)) - .method(method, body) - .addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId) - .build(); - - // TODO: make this call async as well. - Response response = this.httpClient.newCall(request).execute(); - if (!response.isSuccessful()) - { - DaprError error = parseDaprError(response.body().string()); - if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) { - throw new DaprException(error); - } - - throw new DaprException("UNKNOWN", String.format("Dapr's Actor API %s failed with return code %d %s", urlString, response.code())); - } - - return response.body().string(); - } /** * Tries to parse an error from Dapr response body.