mirror of https://github.com/dapr/java-sdk.git
Makes http call async and allows timeout in Mono. (#445)
This commit is contained in:
parent
446cd17e18
commit
81b47f85b3
|
|
@ -79,4 +79,9 @@ public class MethodInvokeController {
|
|||
public List<Person> getPersons() {
|
||||
return persons;
|
||||
}
|
||||
|
||||
@PostMapping(path = "/sleep")
|
||||
public void sleep(@RequestBody int seconds) throws InterruptedException {
|
||||
Thread.sleep(seconds * 1000);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import org.junit.Test;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collection;
|
||||
|
|
@ -18,6 +19,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.runners.Parameterized.Parameter;
|
||||
import static org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
|
|
@ -126,4 +129,18 @@ public class MethodInvokeIT extends BaseIT {
|
|||
assertEquals("Smith", resultPerson.getLastName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeTimeout() throws Exception {
|
||||
try (DaprClient client = new DaprClientBuilder().build()) {
|
||||
long started = System.currentTimeMillis();
|
||||
String message = assertThrows(IllegalStateException.class, () -> {
|
||||
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
|
||||
.block(Duration.ofMillis(10));
|
||||
}).getMessage();
|
||||
long delay = System.currentTimeMillis() - started;
|
||||
assertTrue(delay <= 200); // 200 ms is a reasonable delay if the request timed out.
|
||||
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,12 +14,15 @@ import io.dapr.exceptions.DaprException;
|
|||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.api.trace.propagation.HttpTraceContext;
|
||||
import io.opentelemetry.context.Context;
|
||||
import okhttp3.Call;
|
||||
import okhttp3.Callback;
|
||||
import okhttp3.HttpUrl;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.RequestBody;
|
||||
import okhttp3.ResponseBody;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -30,6 +33,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class DaprHttp implements AutoCloseable {
|
||||
|
||||
|
|
@ -212,7 +216,9 @@ public class DaprHttp implements AutoCloseable {
|
|||
byte[] content,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context));
|
||||
// fromCallable() is needed so the invocation does not happen early, causing a hot mono.
|
||||
return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context))
|
||||
.flatMap(f -> Mono.fromFuture(f));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -233,13 +239,13 @@ public class DaprHttp implements AutoCloseable {
|
|||
* @param content payload to be posted.
|
||||
* @param headers HTTP headers.
|
||||
* @param context OpenTelemetry's Context.
|
||||
* @return Response
|
||||
* @return CompletableFuture for Response.
|
||||
*/
|
||||
private Response doInvokeApi(String method,
|
||||
private CompletableFuture<Response> doInvokeApi(String method,
|
||||
String[] pathSegments,
|
||||
Map<String, String> urlParameters,
|
||||
byte[] content, Map<String, String> headers,
|
||||
Context context) throws IOException {
|
||||
Context context) {
|
||||
final String requestId = UUID.randomUUID().toString();
|
||||
RequestBody body;
|
||||
|
||||
|
|
@ -290,23 +296,10 @@ public class DaprHttp implements AutoCloseable {
|
|||
|
||||
Request request = requestBuilder.build();
|
||||
|
||||
try (okhttp3.Response response = this.httpClient.newCall(request).execute()) {
|
||||
if (!response.isSuccessful()) {
|
||||
DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response));
|
||||
if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) {
|
||||
throw new DaprException(error);
|
||||
}
|
||||
|
||||
throw new DaprException("UNKNOWN", "HTTP status code: " + response.code());
|
||||
}
|
||||
|
||||
Map<String, String> mapHeaders = new HashMap<>();
|
||||
byte[] result = getBodyBytesOrEmptyArray(response);
|
||||
response.headers().forEach(pair -> {
|
||||
mapHeaders.put(pair.getFirst(), pair.getSecond());
|
||||
});
|
||||
return new Response(result, mapHeaders, response.code());
|
||||
}
|
||||
CompletableFuture<Response> future = new CompletableFuture<>();
|
||||
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -336,4 +329,41 @@ public class DaprHttp implements AutoCloseable {
|
|||
return EMPTY_BYTES;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the okhttp3 response into the response object expected internally by the SDK.
|
||||
*/
|
||||
private static class ResponseFutureCallback implements Callback {
|
||||
private final CompletableFuture<Response> future;
|
||||
|
||||
public ResponseFutureCallback(CompletableFuture<Response> future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Call call, IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
|
||||
if (!response.isSuccessful()) {
|
||||
DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response));
|
||||
if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) {
|
||||
future.completeExceptionally(new DaprException(error));
|
||||
return;
|
||||
}
|
||||
|
||||
future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code()));
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, String> mapHeaders = new HashMap<>();
|
||||
byte[] result = getBodyBytesOrEmptyArray(response);
|
||||
response.headers().forEach(pair -> {
|
||||
mapHeaders.put(pair.getFirst(), pair.getSecond());
|
||||
});
|
||||
future.complete(new Response(result, mapHeaders, response.code()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue