Replacing OkHttpClient with Java 11 HttpClient (#1218)

* Replacing OkHttpClient with Java 11 HttpClient

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Adjusted the Dapr HTTP tests

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Adjust tests to use Mockito instead of OkHttp mock interceptor

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Removing OkHTTP from SDK module

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Apparently there is Kotlin deps issue

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Add read timeout to HttpClient request

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Use HTTP 1.1

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Add file header

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Adding back the test related to multiple Monos

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

---------

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>
Co-authored-by: Artur Ciocanu <ciocanu@adobe.com>
This commit is contained in:
artur-ciocanu 2025-02-28 23:00:44 +02:00 committed by GitHub
parent 22d9874ae0
commit bd3a54d6c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1027 additions and 569 deletions

View File

@ -134,6 +134,11 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
</dependencies>
<build>

View File

@ -19,12 +19,13 @@ import io.opentelemetry.context.propagation.TextMapPropagator;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;
import javax.annotation.Nullable;
import java.util.Collections;
@Component

View File

@ -1,24 +0,0 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import okhttp3.OkHttpClient;
public class DaprHttpProxy extends io.dapr.client.DaprHttp {
public DaprHttpProxy(String hostname, int port, String daprApiToken, OkHttpClient httpClient) {
super(hostname, port, daprApiToken, httpClient);
}
}

View File

@ -21,8 +21,8 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test SDK resiliency.
@ -43,7 +43,7 @@ public class WaitForSidecarIT extends BaseIT {
@BeforeAll
public static void init() throws Exception {
daprRun = startDaprApp(WaitForSidecarIT.class.getSimpleName(), 5000);
daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName()+"NotRunning", 5000);
daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName() + "NotRunning", 5000);
daprNotRunning.stop();
toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER);
@ -61,24 +61,30 @@ public class WaitForSidecarIT extends BaseIT {
public void waitTimeout() {
int timeoutInMillis = (int)LATENCY.minusMillis(100).toMillis();
long started = System.currentTimeMillis();
assertThrows(RuntimeException.class, () -> {
try(var client = toxiProxyRun.newDaprClientBuilder().build()) {
client.waitForSidecar(timeoutInMillis).block();
}
});
long duration = System.currentTimeMillis() - started;
assertTrue(duration >= timeoutInMillis);
assertThat(duration).isGreaterThanOrEqualTo(timeoutInMillis);
}
@Test
public void waitSlow() throws Exception {
int timeoutInMillis = (int)LATENCY.plusMillis(100).toMillis();
long started = System.currentTimeMillis();
try(var client = toxiProxyRun.newDaprClientBuilder().build()) {
client.waitForSidecar(timeoutInMillis).block();
}
long duration = System.currentTimeMillis() - started;
assertTrue(duration >= LATENCY.toMillis());
assertThat(duration).isGreaterThanOrEqualTo(LATENCY.toMillis());
}
@Test
@ -87,12 +93,15 @@ public class WaitForSidecarIT extends BaseIT {
// This has to do with a previous bug in the implementation.
int timeoutMilliseconds = 5000;
long started = System.currentTimeMillis();
assertThrows(RuntimeException.class, () -> {
try(var client = daprNotRunning.newDaprClientBuilder().build()) {
client.waitForSidecar(timeoutMilliseconds).block();
}
});
long duration = System.currentTimeMillis() - started;
assertTrue(duration >= timeoutMilliseconds);
assertThat(duration).isGreaterThanOrEqualTo(timeoutMilliseconds);
}
}

View File

@ -23,12 +23,13 @@ import io.dapr.workflows.runtime.DefaultWorkflowContext;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaContext;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;

View File

@ -44,17 +44,6 @@
<artifactId>reactor-core</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
<exclusions>
<exclusion>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>

View File

@ -80,7 +80,6 @@ import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@ -90,6 +89,8 @@ import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@ -441,7 +442,7 @@ public class DaprClientImpl extends AbstractDaprClient {
return buildSubscription(listener, type, request);
}
@NotNull
@Nonnull
private <T> Subscription<T> buildSubscription(
SubscriptionListener<T> listener,
TypeRef<T> type,

View File

@ -15,32 +15,28 @@ package io.dapr.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.Metadata;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.utils.Version;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@ -94,12 +90,12 @@ public class DaprHttp implements AutoCloseable {
}
public static class Response {
private byte[] body;
private Map<String, String> headers;
private int statusCode;
private final byte[] body;
private final Map<String, String> headers;
private final int statusCode;
/**
* Represents an http response.
* Represents a HTTP response.
*
* @param body The body of the http response.
* @param headers The headers of the http response.
@ -127,58 +123,65 @@ public class DaprHttp implements AutoCloseable {
/**
* Defines the standard application/json type for HTTP calls in Dapr.
*/
private static final MediaType MEDIA_TYPE_APPLICATION_JSON =
MediaType.get("application/json; charset=utf-8");
/**
* Shared object representing an empty request body in JSON.
*/
private static final RequestBody REQUEST_BODY_EMPTY_JSON =
RequestBody.Companion.create("", MEDIA_TYPE_APPLICATION_JSON);
private static final String MEDIA_TYPE_APPLICATION_JSON = "application/json; charset=utf-8";
/**
* Empty input or output.
*/
private static final byte[] EMPTY_BYTES = new byte[0];
/**
* Empty Body Publisher.
*/
private static final HttpRequest.BodyPublisher EMPTY_BODY_PUBLISHER = HttpRequest.BodyPublishers.noBody();
/**
* Endpoint used to communicate to Dapr's HTTP endpoint.
*/
private final URI uri;
/**
* Http client used for all API calls.
*/
private final OkHttpClient httpClient;
/**
* Dapr API Token required to interact with DAPR APIs.
*/
private final String daprApiToken;
/**
* Http client request read timeout.
*/
private final Duration readTimeout;
/**
* Http client used for all API calls.
*/
private final HttpClient httpClient;
/**
* Creates a new instance of {@link DaprHttp}.
*
* @param hostname Hostname for calling Dapr. (e.g. "127.0.0.1")
* @param port Port for calling Dapr. (e.g. 3500)
* @param readTimeout HTTP request read timeout
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String hostname, int port, String daprApiToken, OkHttpClient httpClient) {
DaprHttp(String hostname, int port, String daprApiToken, Duration readTimeout, HttpClient httpClient) {
this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port);
this.httpClient = httpClient;
this.daprApiToken = daprApiToken;
this.readTimeout = readTimeout;
this.httpClient = httpClient;
}
/**
* Creates a new instance of {@link DaprHttp}.
*
* @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com")
* @param uri Endpoint for calling Dapr.
* @param readTimeout HTTP request read timeout
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String uri, String daprApiToken, OkHttpClient httpClient) {
DaprHttp(String uri, String daprApiToken, Duration readTimeout, HttpClient httpClient) {
this.uri = URI.create(uri);
this.httpClient = httpClient;
this.daprApiToken = daprApiToken;
this.readTimeout = readTimeout;
this.httpClient = httpClient;
}
/**
@ -244,13 +247,13 @@ public class DaprHttp implements AutoCloseable {
Map<String, String> headers,
ContextView context) {
// fromCallable() is needed so the invocation does not happen early, causing a hot mono.
return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context))
.flatMap(f -> Mono.fromFuture(f));
return Mono.fromCallable(() -> doInvokeApi(method, headers, pathSegments, urlParameters, content, context))
.flatMap(Mono::fromFuture);
}
/**
* Shutdown call is not necessary for OkHttpClient.
* @see OkHttpClient
* Shutdown call is not necessary for HttpClient.
* @see HttpClient
*/
@Override
public void close() {
@ -268,77 +271,155 @@ public class DaprHttp implements AutoCloseable {
* @param context OpenTelemetry's Context.
* @return CompletableFuture for Response.
*/
private CompletableFuture<Response> doInvokeApi(String method,
private CompletableFuture<Response> doInvokeApi(
String method,
Map<String, String> headers,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
byte[] content,
ContextView context) {
final String requestId = UUID.randomUUID().toString();
RequestBody body;
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
if (content == null) {
body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON)
? REQUEST_BODY_EMPTY_JSON
: RequestBody.Companion.create(new byte[0], mediaType);
} else {
body = RequestBody.Companion.create(content, mediaType);
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme(uri.getScheme())
.host(uri.getHost());
if (uri.getPort() > 0) {
urlBuilder.port(uri.getPort());
}
if (uri.getPath() != null) {
urlBuilder.addPathSegments(uri.getPath());
}
for (String pathSegment : pathSegments) {
urlBuilder.addPathSegment(pathSegment);
}
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
requestBuilder.uri(createUri(uri, pathSegments, urlParameters));
addHeader(requestBuilder, Headers.DAPR_USER_AGENT, Version.getSdkVersion());
addHeader(requestBuilder, HEADER_DAPR_REQUEST_ID, UUID.randomUUID().toString());
addHeader(requestBuilder, "Content-Type", getContentType(headers));
addHeaders(requestBuilder, headers);
if (daprApiToken != null) {
addHeader(requestBuilder, Headers.DAPR_API_TOKEN, daprApiToken);
}
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build())
.addHeader(HEADER_DAPR_REQUEST_ID, requestId);
if (context != null) {
context.stream()
.filter(entry -> ALLOWED_CONTEXT_IN_HEADERS.contains(entry.getKey().toString().toLowerCase()))
.forEach(entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString()));
.forEach(entry -> addHeader(requestBuilder, entry.getKey().toString(), entry.getValue().toString()));
}
HttpRequest.BodyPublisher body = getBodyPublisher(content);
if (HttpMethods.GET.name().equals(method)) {
requestBuilder.get();
requestBuilder.GET();
} else if (HttpMethods.DELETE.name().equals(method)) {
requestBuilder.delete();
requestBuilder.DELETE();
} else if (HttpMethods.HEAD.name().equals(method)) {
requestBuilder.head();
// HTTP HEAD is not exposed as a normal method
requestBuilder.method(HttpMethods.HEAD.name(), EMPTY_BODY_PUBLISHER);
} else {
requestBuilder.method(method, body);
}
if (daprApiToken != null) {
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
}
requestBuilder.addHeader(Headers.DAPR_USER_AGENT, Version.getSdkVersion());
HttpRequest request = requestBuilder.timeout(readTimeout).build();
if (headers != null) {
Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream()
.forEach(header -> {
requestBuilder.addHeader(header.getKey(), header.getValue());
});
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.thenApply(this::createResponse);
}
private static String getContentType(Map<String, String> headers) {
String result = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
return result == null ? MEDIA_TYPE_APPLICATION_JSON : result;
}
private static URI createUri(URI uri, String[] pathSegments, Map<String, List<String>> urlParameters) {
String path = createPath(uri, pathSegments);
String query = createQuery(urlParameters);
try {
return new URI(uri.getScheme(), uri.getAuthority(), path, query, null);
} catch (URISyntaxException exception) {
throw new DaprException(exception);
}
}
private static String createPath(URI uri, String[] pathSegments) {
String basePath = uri.getPath();
if (pathSegments == null || pathSegments.length == 0) {
return basePath;
}
Request request = requestBuilder.build();
StringBuilder pathBuilder = new StringBuilder(basePath);
if (!basePath.endsWith("/")) { // Add a "/" if needed
pathBuilder.append("/");
}
CompletableFuture<Response> future = new CompletableFuture<>();
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
return future;
for (String segment : pathSegments) {
pathBuilder.append(encodePathSegment(segment)).append("/"); // Encode each segment
}
pathBuilder.deleteCharAt(pathBuilder.length() - 1); // Remove the trailing "/"
return pathBuilder.toString();
}
private static String createQuery(Map<String, List<String>> urlParameters) {
if (urlParameters == null || urlParameters.isEmpty()) {
return null;
}
StringBuilder queryBuilder = new StringBuilder();
for (Map.Entry<String, List<String>> entry : urlParameters.entrySet()) {
String key = entry.getKey();
List<String> values = entry.getValue();
for (String value : values) {
if (queryBuilder.length() > 0) {
queryBuilder.append("&");
}
queryBuilder.append(encodeQueryParam(key, value)); // Encode key and value
}
}
return queryBuilder.toString();
}
private static String encodePathSegment(String segment) {
return URLEncoder.encode(segment, StandardCharsets.UTF_8).replace("+", "%20"); // Encode and handle spaces
}
private static String encodeQueryParam(String key, String value) {
return URLEncoder.encode(key, StandardCharsets.UTF_8) + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
}
private static void addHeader(HttpRequest.Builder requestBuilder, String name, String value) {
requestBuilder.header(name, value);
}
private static void addHeaders(HttpRequest.Builder requestBuilder, Map<String, String> headers) {
if (headers == null || headers.isEmpty()) {
return;
}
headers.forEach((k, v) -> addHeader(requestBuilder, k, v));
}
private static HttpRequest.BodyPublisher getBodyPublisher(byte[] content) {
return HttpRequest.BodyPublishers.ofByteArray(Objects.requireNonNullElse(content, EMPTY_BYTES));
}
private Response createResponse(HttpResponse<byte[]> httpResponse) {
Optional<String> headerValue = httpResponse.headers().firstValue("Metadata.statuscode");
int httpStatusCode = parseHttpStatusCode(headerValue, httpResponse.statusCode());
byte[] body = getBodyBytesOrEmptyArray(httpResponse.body());
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
DaprError error = parseDaprError(body);
if (error != null) {
throw new DaprException(error, body, httpStatusCode);
} else {
throw new DaprException("UNKNOWN", "", body, httpStatusCode);
}
}
Map<String, String> responseHeaders = new HashMap<>();
httpResponse.headers().map().forEach((k, v) -> responseHeaders.put(k, v.isEmpty() ? null : v.get(0)));
return new Response(body, responseHeaders, httpStatusCode);
}
/**
@ -360,70 +441,18 @@ public class DaprHttp implements AutoCloseable {
}
}
private static byte[] getBodyBytesOrEmptyArray(okhttp3.Response response) throws IOException {
ResponseBody body = response.body();
if (body != null) {
return body.bytes();
}
return EMPTY_BYTES;
private static byte[] getBodyBytesOrEmptyArray(byte[] body) {
return body == null ? EMPTY_BYTES : body;
}
/**
* Converts the okhttp3 response into the response object expected internally by the SDK.
*/
private static class ResponseFutureCallback implements Callback {
private final CompletableFuture<Response> future;
public ResponseFutureCallback(CompletableFuture<Response> future) {
this.future = future;
}
@Override
public void onFailure(Call call, IOException e) {
future.completeExceptionally(e);
}
@Override
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code());
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
try {
byte[] payload = getBodyBytesOrEmptyArray(response);
DaprError error = parseDaprError(payload);
if (error != null) {
future.completeExceptionally(new DaprException(error, payload, httpStatusCode));
return;
}
future.completeExceptionally(
new DaprException("UNKNOWN", "", payload, httpStatusCode));
return;
} catch (DaprException e) {
future.completeExceptionally(e);
return;
}
}
Map<String, String> mapHeaders = new HashMap<>();
byte[] result = getBodyBytesOrEmptyArray(response);
response.headers().forEach(pair -> {
mapHeaders.put(pair.getFirst(), pair.getSecond());
});
future.complete(new Response(result, mapHeaders, httpStatusCode));
}
}
private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) {
if ((headerValue == null) || headerValue.isEmpty()) {
private static int parseHttpStatusCode(Optional<String> headerValue, int defaultStatusCode) {
if (headerValue.isEmpty()) {
return defaultStatusCode;
}
// Metadata used to override status code with code received from HTTP binding.
try {
int httpStatusCode = Integer.parseInt(headerValue);
int httpStatusCode = Integer.parseInt(headerValue.get());
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return httpStatusCode;
}

View File

@ -14,15 +14,13 @@ limitations under the License.
package io.dapr.client;
import io.dapr.config.Properties;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static io.dapr.config.Properties.API_TOKEN;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_REQUESTS;
import static io.dapr.config.Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS;
import static io.dapr.config.Properties.HTTP_ENDPOINT;
@ -34,24 +32,13 @@ import static io.dapr.config.Properties.SIDECAR_IP;
*/
public class DaprHttpBuilder {
/**
* Singleton OkHttpClient.
*/
private static volatile OkHttpClient OK_HTTP_CLIENT;
private static volatile HttpClient HTTP_CLIENT;
/**
* Static lock object.
*/
private static final Object LOCK = new Object();
/**
* HTTP keep alive duration in seconds.
*
* <p>Just hard code to a reasonable value.
*/
private static final int KEEP_ALIVE_DURATION = 30;
/**
* Build an instance of the Http client based on the provided setup.
* @param properties to configure the DaprHttp client
@ -68,38 +55,30 @@ public class DaprHttpBuilder {
* @return Instance of {@link DaprHttp}
*/
private DaprHttp buildDaprHttp(Properties properties) {
if (OK_HTTP_CLIENT == null) {
if (HTTP_CLIENT == null) {
synchronized (LOCK) {
if (OK_HTTP_CLIENT == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS));
builder.readTimeout(readTimeout);
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(properties.getValue(HTTP_CLIENT_MAX_REQUESTS));
// The maximum number of requests for each host to execute concurrently.
// Default value is 5 in okhttp which is totally UNACCEPTABLE!
// For sidecar case, set it the same as maxRequests.
dispatcher.setMaxRequestsPerHost(HTTP_CLIENT_MAX_REQUESTS.get());
builder.dispatcher(dispatcher);
ConnectionPool pool = new ConnectionPool(properties.getValue(HTTP_CLIENT_MAX_IDLE_CONNECTIONS),
KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
builder.connectionPool(pool);
OK_HTTP_CLIENT = builder.build();
if (HTTP_CLIENT == null) {
int maxRequests = properties.getValue(HTTP_CLIENT_MAX_REQUESTS);
Executor executor = Executors.newFixedThreadPool(maxRequests);
HTTP_CLIENT = HttpClient.newBuilder()
.executor(executor)
.version(HttpClient.Version.HTTP_1_1)
.build();
}
}
}
String endpoint = properties.getValue(HTTP_ENDPOINT);
String apiToken = properties.getValue(API_TOKEN);
Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS));
if ((endpoint != null) && !endpoint.isEmpty()) {
return new DaprHttp(endpoint, properties.getValue(API_TOKEN), OK_HTTP_CLIENT);
return new DaprHttp(endpoint, apiToken, readTimeout, HTTP_CLIENT);
}
return new DaprHttp(properties.getValue(SIDECAR_IP), properties.getValue(HTTP_PORT), properties.getValue(API_TOKEN),
OK_HTTP_CLIENT);
String sidecarIp = properties.getValue(SIDECAR_IP);
int port = properties.getValue(HTTP_PORT);
return new DaprHttp(sidecarIp, port, apiToken, readTimeout, HTTP_CLIENT);
}
}

View File

@ -19,9 +19,10 @@ import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -153,7 +154,7 @@ public class Subscription<T> implements Closeable {
}).onErrorReturn(SubscriptionListener.Status.RETRY);
}
@NotNull
@Nonnull
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
String id, SubscriptionListener.Status status) {
DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =

View File

@ -14,12 +14,12 @@ limitations under the License.
package io.dapr.client.domain;
import io.dapr.client.DaprHttp;
import okhttp3.HttpUrl;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* HTTP Extension class.
@ -67,17 +67,17 @@ public final class HttpExtension {
/**
* HTTP verb.
*/
private DaprHttp.HttpMethods method;
private final DaprHttp.HttpMethods method;
/**
* HTTP query params.
*/
private Map<String, List<String>> queryParams;
private final Map<String, List<String>> queryParams;
/**
* HTTP headers.
*/
private Map<String, String> headers;
private final Map<String, String> headers;
/**
* Construct a HttpExtension object.
@ -126,18 +126,29 @@ public final class HttpExtension {
* @return Encoded HTTP query string.
*/
public String encodeQueryString() {
if ((this.queryParams == null) || (this.queryParams.isEmpty())) {
if (queryParams == null || queryParams.isEmpty()) {
return "";
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
// Setting required values but we only need query params in the end.
urlBuilder.scheme("http").host("localhost");
Optional.ofNullable(this.queryParams).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
return urlBuilder.build().encodedQuery();
StringBuilder queryBuilder = new StringBuilder();
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
String key = entry.getKey();
List<String> values = entry.getValue();
for (String value : values) {
if (queryBuilder.length() > 0) {
queryBuilder.append("&");
}
queryBuilder.append(encodeQueryParam(key, value)); // Encode key and value
}
}
return queryBuilder.toString();
}
private static String encodeQueryParam(String key, String value) {
return URLEncoder.encode(key, StandardCharsets.UTF_8) + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
}
}

View File

@ -13,24 +13,16 @@ limitations under the License.
package io.dapr.client;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
import io.dapr.v1.DaprGrpc;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.ResponseBody;
import okhttp3.mock.Behavior;
import okhttp3.mock.MediaTypes;
import okhttp3.mock.MockInterceptor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -40,13 +32,15 @@ import reactor.util.context.ContextView;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static io.dapr.utils.TestUtils.findFreePort;
@ -57,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DaprClientHttpTest {
@ -64,6 +59,12 @@ public class DaprClientHttpTest {
private final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
private static final int HTTP_NO_CONTENT = 204;
private static final int HTTP_NOT_FOUND = 404;
private static final int HTTP_SERVER_ERROR = 500;
private static final int HTTP_OK = 200;
private static final Duration READ_TIMEOUT = Duration.ofSeconds(60);
private String sidecarIp;
private String daprApiToken;
@ -72,17 +73,14 @@ public class DaprClientHttpTest {
private DaprHttp daprHttp;
private OkHttpClient okHttpClient;
private MockInterceptor mockInterceptor;
private HttpClient httpClient;
@BeforeEach
public void setUp() {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
daprApiToken = Properties.API_TOKEN.get();
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, okHttpClient);
httpClient = mock(HttpClient.class);
daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, READ_TIMEOUT, httpClient);
daprClientHttp = buildDaprClient(daprHttp);
}
@ -100,14 +98,16 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarTimeOutHealthCheck() throws Exception {
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, okHttpClient);
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.delay(200)
.respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
Thread.sleep(200);
return mockResponse;
});
StepVerifier.create(daprClientHttp.waitForSidecar(100))
.expectSubscription()
@ -123,15 +123,20 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarBadHealthCheck() throws Exception {
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NOT_FOUND);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
int port = findFreePort();
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient);
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
AtomicInteger count = new AtomicInteger(0);
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.times(6)
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
if (count.getAndIncrement() < 6) {
return mockResponse;
}
return CompletableFuture.failedFuture(new TimeoutException());
});
// it will timeout.
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
@ -143,24 +148,25 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception {
int port = findFreePort();
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient);
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
AtomicInteger count = new AtomicInteger(0);
when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
if (count.getAndIncrement() < 2) {
Thread.sleep(1000);
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR);
return CompletableFuture.<HttpResponse<Object>>completedFuture(mockHttpResponse);
}
Thread.sleep(1000);
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
return CompletableFuture.<HttpResponse<Object>>completedFuture(mockHttpResponse);
});
// Simulate a slow response
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.delay(1000)
.times(2)
.respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json")));
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.delay(1000)
.times(1)
.respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
.expectSubscription()
.expectNext()
@ -170,14 +176,13 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarOK() throws Exception {
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
int port = findFreePort();
daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient);
daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.respond(204);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
StepVerifier.create(daprClientHttp.waitForSidecar(10000))
.expectSubscription()
@ -187,12 +192,14 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarTimeoutOK() throws Exception {
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.respond(204);
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
try (ServerSocket serverSocket = new ServerSocket(0)) {
final int port = serverSocket.getLocalPort();
int port = serverSocket.getLocalPort();
Thread t = new Thread(() -> {
try {
try (Socket socket = serverSocket.accept()) {
@ -201,7 +208,8 @@ public class DaprClientHttpTest {
}
});
t.start();
daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient);
daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
daprClientHttp.waitForSidecar(10000).block();
}
@ -209,80 +217,146 @@ public class DaprClientHttpTest {
@Test
public void invokeServiceVerbNull() {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/publish/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
MockHttpResponse mockHttpResponse = new MockHttpResponse(EXPECTED_RESULT.getBytes(), HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
assertThrows(IllegalArgumentException.class, () ->
daprClientHttp.invokeMethod(null, "", "", null, null, (Class)null).block());
daprClientHttp.invokeMethod(
null,
"",
"",
null,
null,
(Class)null
).block());
}
@Test
public void invokeServiceIllegalArgumentException() {
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/badorder")
.respond("INVALID JSON");
byte[] content = "INVALID JSON".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
assertThrows(IllegalArgumentException.class, () -> {
// null HttpMethod
daprClientHttp.invokeMethod("1", "2", "3", new HttpExtension(null), null, (Class)null).block();
daprClientHttp.invokeMethod(
"1",
"2",
"3",
new HttpExtension(null),
null,
(Class)null
).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null HttpExtension
daprClientHttp.invokeMethod("1", "2", "3", null, null, (Class)null).block();
daprClientHttp.invokeMethod(
"1",
"2",
"3",
null,
null,
(Class)null
).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// empty appId
daprClientHttp.invokeMethod("", "1", null, HttpExtension.GET, null, (Class)null).block();
daprClientHttp.invokeMethod(
"",
"1",
null,
HttpExtension.GET,
null,
(Class)null
).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null appId, empty method
daprClientHttp.invokeMethod(null, "", null, HttpExtension.POST, null, (Class)null).block();
daprClientHttp.invokeMethod(
null,
"",
null,
HttpExtension.POST,
null,
(Class)null
).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// empty method
daprClientHttp.invokeMethod("1", "", null, HttpExtension.PUT, null, (Class)null).block();
daprClientHttp.invokeMethod(
"1",
"",
null,
HttpExtension.PUT,
null,
(Class)null
).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null method
daprClientHttp.invokeMethod("1", null, null, HttpExtension.DELETE, null, (Class)null).block();
daprClientHttp.invokeMethod(
"1",
null,
null,
HttpExtension.DELETE,
null,
(Class)null
).block();
});
assertThrowsDaprException(JsonParseException.class, () -> {
// invalid JSON response
daprClientHttp.invokeMethod("41", "badorder", null, HttpExtension.GET, null, String.class).block();
daprClientHttp.invokeMethod(
"41",
"badorder",
null,
HttpExtension.GET,
null,
String.class
).block();
});
}
@Test
public void invokeServiceDaprError() {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
.respond(500,
ResponseBody.create(
"{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}",
MediaTypes.MEDIATYPE_JSON));
byte[] content = "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
daprClientHttp.invokeMethod(
"myapp",
"mymethod",
"anything",
HttpExtension.POST
).block();
});
assertEquals("MYCODE", exception.getErrorCode());
assertEquals("MYCODE: My Message (HTTP status code: 500)", exception.getMessage());
assertEquals(500, exception.getHttpStatusCode());
assertEquals(HTTP_SERVER_ERROR, exception.getHttpStatusCode());
}
@Test
public void invokeServiceDaprErrorFromGRPC() {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
.respond(500,
ResponseBody.create(
"{ \"code\": 7 }",
MediaTypes.MEDIATYPE_JSON));
byte[] content = "{ \"code\": 7 }".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
daprClientHttp.invokeMethod(
"myapp",
"mymethod",
"anything",
HttpExtension.POST
).block();
});
assertEquals("PERMISSION_DENIED", exception.getErrorCode());
@ -291,12 +365,11 @@ public class DaprClientHttpTest {
@Test
public void invokeServiceDaprErrorUnknownJSON() {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
.respond(500,
ResponseBody.create(
"{ \"anything\": 7 }",
MediaTypes.MEDIATYPE_JSON));
byte[] content = "{ \"anything\": 7 }".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
@ -309,119 +382,203 @@ public class DaprClientHttpTest {
@Test
public void invokeServiceDaprErrorEmptyString() {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
.respond(500,
ResponseBody.create(
"",
MediaTypes.MEDIATYPE_JSON));
byte[] content = "".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
daprClientHttp.invokeMethod(
"myapp",
"mymethod",
"anything",
HttpExtension.POST
).block();
});
assertEquals("UNKNOWN", exception.getErrorCode());
assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage());
}
@Test
public void invokeServiceMethodNull() {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/publish/A")
.respond(EXPECTED_RESULT);
byte[] content = EXPECTED_RESULT.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
assertThrows(IllegalArgumentException.class, () ->
daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block());
daprClientHttp.invokeMethod(
"1",
"",
null,
HttpExtension.POST,
null,
(Class)null
).block());
}
@Test
public void invokeService() {
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond("\"hello world\"");
byte[] content = "\"hello world\"".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Mono<String> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
null,
HttpExtension.GET,
null,
String.class
);
Mono<String> mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class);
assertEquals("hello world", mono.block());
}
@Test
public void invokeServiceNullResponse() {
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond(new byte[0]);
byte[] content = new byte[0];
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Mono<String> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
null,
HttpExtension.GET,
null,
String.class
);
Mono<String> mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class);
assertNull(mono.block());
}
@Test
public void simpleInvokeService() {
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond(EXPECTED_RESULT);
byte[] content = EXPECTED_RESULT.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Mono<byte[]> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
null,
HttpExtension.GET,
byte[].class
);
Mono<byte[]> mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, byte[].class);
assertEquals(new String(mono.block()), EXPECTED_RESULT);
}
@Test
public void invokeServiceWithMetadataMap() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond(EXPECTED_RESULT);
Map<String, String> map = Map.of();
byte[] content = EXPECTED_RESULT.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
Mono<byte[]> mono = daprClientHttp.invokeMethod("41", "neworder", (byte[]) null, HttpExtension.GET, map);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Mono<byte[]> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
(byte[]) null,
HttpExtension.GET,
map
);
String monoString = new String(mono.block());
assertEquals(monoString, EXPECTED_RESULT);
}
@Test
public void invokeServiceWithOutRequest() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond(EXPECTED_RESULT);
Map<String, String> map = Map.of();
byte[] content = EXPECTED_RESULT.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Mono<Void> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
HttpExtension.GET,
map
);
Mono<Void> mono = daprClientHttp.invokeMethod("41", "neworder", HttpExtension.GET, map);
assertNull(mono.block());
}
@Test
public void invokeServiceWithRequest() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond(EXPECTED_RESULT);
Map<String, String> map = Map.of();
byte[] content = EXPECTED_RESULT.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Mono<Void> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
"",
HttpExtension.GET,
map
);
Mono<Void> mono = daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map);
assertNull(mono.block());
}
@Test
public void invokeServiceWithRequestAndQueryString() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder?param1=1&param2=a&param2=b%2Fc")
.respond(EXPECTED_RESULT);
Map<String, String> map = Map.of();
Map<String, List<String>> queryString = Map.of(
"param1", List.of("1"),
"param2", List.of("a", "b/c")
);
byte[] content = EXPECTED_RESULT.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
Map<String, List<String>> queryString = new HashMap<>();
queryString.put("param1", Collections.singletonList("1"));
queryString.put("param2", Arrays.asList("a", "b/c"));
HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString, null);
Mono<Void> mono = daprClientHttp.invokeMethod("41", "neworder", "", httpExtension, map);
Mono<Void> mono = daprClientHttp.invokeMethod(
"41",
"neworder",
"",
httpExtension,
map
);
assertNull(mono.block());
}
@Test
public void invokeServiceNoHotMono() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.respond(500);
Map<String, String> map = Map.of();
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
daprClientHttp.invokeMethod(
"41",
"neworder",
"",
HttpExtension.GET,
map
);
// No exception should be thrown because did not call block() on mono above.
}
@ -433,18 +590,27 @@ public class DaprClientHttpTest {
.put("traceparent", traceparent)
.put("tracestate", tracestate)
.put("not_added", "xyz");
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
.header("traceparent", traceparent)
.header("tracestate", tracestate)
.respond(new byte[0]);
byte[] content = new byte[0];
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder")
.setBody("request")
.setHttpExtension(HttpExtension.POST);
Mono<Void> result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class))
.contextWrite(it -> it.putAll((ContextView) context));
result.block();
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(traceparent, request.headers().firstValue("traceparent").get());
assertEquals(tracestate, request.headers().firstValue("tracestate").get());
}
@Test
@ -467,23 +633,4 @@ public class DaprClientHttpTest {
daprClientHttp.close();
}
private static class XmlSerializer implements DaprObjectSerializer {
private static final XmlMapper XML_MAPPER = new XmlMapper();
@Override
public byte[] serialize(Object o) throws IOException {
return XML_MAPPER.writeValueAsBytes(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return XML_MAPPER.readValue(data, new TypeReference<T>() {});
}
@Override
public String getContentType() {
return "application/xml";
}
}
}

View File

@ -14,10 +14,10 @@ limitations under the License.
package io.dapr.client;
import io.dapr.config.Properties;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.net.http.HttpClient;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@ -30,14 +30,13 @@ public class DaprHttpBuilderTest {
DaprHttp daprHttp = new DaprHttpBuilder().build(properties);
DaprHttp anotherDaprHttp = new DaprHttpBuilder().build(properties);
assertSame(getOkHttpClient(daprHttp), getOkHttpClient(anotherDaprHttp));
assertSame(getHttpClient(daprHttp), getHttpClient(anotherDaprHttp));
}
private static OkHttpClient getOkHttpClient(DaprHttp daprHttp) throws Exception {
private static HttpClient getHttpClient(DaprHttp daprHttp) throws Exception {
Field httpClientField = DaprHttp.class.getDeclaredField("httpClient");
httpClientField.setAccessible(true);
OkHttpClient okHttpClient = (OkHttpClient) httpClientField.get(daprHttp);
HttpClient okHttpClient = (HttpClient) httpClientField.get(daprHttp);
assertNotNull(okHttpClient);
return okHttpClient;
}

View File

@ -16,6 +16,7 @@ package io.dapr.client;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@ -25,6 +26,8 @@ import java.util.Map;
*/
public class DaprHttpStub extends DaprHttp {
private static final Duration READ_TIMEOUT = Duration.ofSeconds(60);
public static class ResponseStub extends DaprHttp.Response {
public ResponseStub(byte[] body, Map<String, String> headers, int statusCode) {
super(body, headers, statusCode);
@ -34,7 +37,7 @@ public class DaprHttpStub extends DaprHttp {
* Instantiates a stub for DaprHttp
*/
public DaprHttpStub() {
super(null, 3000, "stubToken", null);
super(null, 3000, "stubToken", READ_TIMEOUT, null);
}
/**

View File

@ -16,14 +16,10 @@ import io.dapr.config.Properties;
import io.dapr.exceptions.DaprErrorDetails;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.TypeRef;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.ResponseBody;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.Context;
@ -32,214 +28,407 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.HashMap;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static io.dapr.utils.TestUtils.formatIpAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(SystemStubsExtension.class)
public class DaprHttpTest {
@SystemStub
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
private static final String STATE_PATH = DaprHttp.API_VERSION + "/state";
private static final int HTTP_OK = 200;
private static final int HTTP_SERVER_ERROR = 500;
private static final int HTTP_NO_CONTENT = 204;
private static final int HTTP_NOT_FOUND = 404;
private static final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
private static final Duration READ_TIMEOUT = Duration.ofSeconds(60);
@SystemStub
private final EnvironmentVariables environmentVariables = new EnvironmentVariables();
private String sidecarIp;
private String daprTokenApi;
private OkHttpClient okHttpClient;
private HttpClient httpClient;
private MockInterceptor mockInterceptor;
private ObjectSerializer serializer = new ObjectSerializer();
private final ObjectSerializer serializer = new ObjectSerializer();
@BeforeEach
public void setUp() {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
daprTokenApi = Properties.API_TOKEN.get();
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
httpClient = mock(HttpClient.class);
}
@Test
public void invokeApi_daprApiToken_present() throws IOException {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.hasHeader(Headers.DAPR_API_TOKEN)
.respond(serializer.serialize(EXPECTED_RESULT));
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
environmentVariables.set(Properties.API_TOKEN.getEnvName(), "xyz");
assertEquals("xyz", Properties.API_TOKEN.get());
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty());
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
(byte[]) null,
null,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("POST", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
assertEquals("xyz", request.headers().firstValue(Headers.DAPR_API_TOKEN).get());
}
@Test
public void invokeApi_daprApiToken_absent() throws IOException {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.not()
.hasHeader(Headers.DAPR_API_TOKEN)
.respond(serializer.serialize(EXPECTED_RESULT));
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
assertNull(Properties.API_TOKEN.get());
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty());
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
(byte[]) null,
null,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("POST", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
assertFalse(request.headers().map().containsKey(Headers.DAPR_API_TOKEN));
}
@Test
public void invokeMethod() throws IOException {
Map<String, String> headers = new HashMap<>();
headers.put("content-type", "text/html");
headers.put("header1", "value1");
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty());
Map<String, String> headers = Map.of(
"content-type", "text/html",
"header1", "value1"
);
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
(byte[]) null,
headers,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("POST", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
assertEquals("text/html", request.headers().firstValue("content-type").get());
assertEquals("value1", request.headers().firstValue("header1").get());
}
@Test
public void invokeMethodIPv6() throws IOException {
sidecarIp = formatIpAddress("2001:db8:3333:4444:5555:6666:7777:8888");
Map<String, String> headers = new HashMap<>();
headers.put("content-type", "text/html");
headers.put("header1", "value1");
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty());
Map<String, String> headers = Map.of(
"content-type", "text/html",
"header1", "value1"
);
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
(byte[]) null,
headers,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("POST", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
assertEquals("text/html", request.headers().firstValue("content-type").get());
assertEquals("value1", request.headers().firstValue("header1").get());
}
@Test
public void invokePostMethod() throws IOException {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT))
.addHeader("Header", "Value");
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.empty());
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
"",
null,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("POST", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
}
@Test
public void invokeDeleteMethod() throws IOException {
mockInterceptor.addRule()
.delete("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.empty());
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"DELETE",
"v1.0/state".split("/"),
null,
(String) null,
null,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("DELETE", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
}
@Test
public void invokeHEADMethod() throws IOException {
mockInterceptor.addRule().head("http://127.0.0.1:3500/v1.0/state").respond(HttpURLConnection.HTTP_OK);
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("HEAD", "v1.0/state".split("/"), null, (String) null, null, Context.empty());
public void invokeHeadMethod() {
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"HEAD",
"v1.0/state".split("/"),
null,
(String) null,
null,
Context.empty()
);
DaprHttp.Response response = mono.block();
assertEquals(HttpURLConnection.HTTP_OK, response.getStatusCode());
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals("HEAD", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
assertEquals(HTTP_OK, response.getStatusCode());
}
@Test
public void invokeGetMethod() throws IOException {
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/v1.0/get")
.respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.empty());
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"GET",
"v1.0/state".split("/"),
null,
null,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("GET", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
}
@Test
public void invokeMethodWithHeaders() throws IOException {
Map<String, String> headers = new HashMap<>();
headers.put("header", "value");
headers.put("header1", "value1");
Map<String, List<String>> urlParameters = new HashMap<>();
urlParameters.put("orderId", Collections.singletonList("41"));
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41")
.respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.empty());
Map<String, String> headers = Map.of(
"header", "value",
"header1", "value1"
);
Map<String, List<String>> urlParameters = Map.of(
"orderId", List.of("41")
);
byte[] content = serializer.serialize(EXPECTED_RESULT);
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"GET",
"v1.0/state/order".split("/"),
urlParameters,
headers,
Context.empty()
);
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
verify(httpClient).sendAsync(requestCaptor.capture(), any());
HttpRequest request = requestCaptor.getValue();
assertEquals(EXPECTED_RESULT, body);
assertEquals("GET", request.method());
assertEquals("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41", request.uri().toString());
assertEquals("value", request.headers().firstValue("header").get());
assertEquals("value1", request.headers().firstValue("header1").get());
}
@Test
public void invokePostMethodRuntime() throws IOException {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(500);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
public void invokePostMethodRuntime() {
MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
null,
Context.empty());
StepVerifier.create(mono).expectError(RuntimeException.class).verify();
}
@Test
public void invokePostDaprError() throws IOException {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(500, ResponseBody.create(MediaType.parse("text"),
"{\"errorCode\":null,\"message\":null}"));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
public void invokePostDaprError() {
byte[] content = "{\"errorCode\":null,\"message\":null}".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
null,
Context.empty()
);
StepVerifier.create(mono).expectError(RuntimeException.class).verify();
}
@Test
public void invokePostMethodUnknownError() throws IOException {
mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(500, ResponseBody.create(MediaType.parse("application/json"),
"{\"errorCode\":\"null\",\"message\":\"null\"}"));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
public void invokePostMethodUnknownError() {
byte[] content = "{\"errorCode\":null,\"message\":null}".getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/state".split("/"),
null,
null,
Context.empty()
);
StepVerifier.create(mono).expectError(RuntimeException.class).verify();
}
@Test
public void validateExceptionParsing() {
final String payload = "{" +
String payload = "{" +
"\"errorCode\":\"ERR_PUBSUB_NOT_FOUND\"," +
"\"message\":\"pubsub abc is not found\"," +
"\"details\":[" +
@ -249,14 +438,24 @@ public class DaprHttpTest {
"\"metadata\":{}," +
"\"reason\":\"DAPR_PUBSUB_NOT_FOUND\"" +
"}]}";
mockInterceptor.addRule()
.post("http://127.0.0.1:3500/v1.0/pubsub/publish")
.respond(500, ResponseBody.create(MediaType.parse("application/json"),
payload));
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/pubsub/publish".split("/"), null, null, Context.empty());
byte[] content = payload.getBytes();
MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
CompletableFuture<HttpResponse<Object>> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi(
"POST",
"v1.0/pubsub/publish".split("/"),
null,
null,
Context.empty()
);
StepVerifier.create(mono).expectErrorMatches(e -> {
assertEquals(DaprException.class, e.getClass());
DaprException daprException = (DaprException)e;
assertEquals("ERR_PUBSUB_NOT_FOUND", daprException.getErrorCode());
assertEquals("DAPR_PUBSUB_NOT_FOUND",
@ -267,15 +466,15 @@ public class DaprHttpTest {
}
/**
* The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR
* The purpose of this test is to show that it doesn't matter when the client is called, the actual call to DAPR
* will be done when the output Mono response call the Mono.block method.
* Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state
* you just retrived but block for the delete response, when later you block for the response of the getState, you will
* not found the state.
* Like for instance if you call getState, without blocking for the response, and then call delete for the same state
* you just retrieved but block for the delete response, when later you block for the response of the getState, you will
* not find the state.
* <p>This test will execute the following flow:</p>
* <ol>
* <li>Exeucte client getState for Key=key1</li>
* <li>Block for result to the the state</li>
* <li>Execute client getState for Key=key1</li>
* <li>Block for result to the state</li>
* <li>Assert the Returned State is the expected to key1</li>
* <li>Execute client getState for Key=key2</li>
* <li>Execute client deleteState for Key=key2</li>
@ -285,35 +484,64 @@ public class DaprHttpTest {
*
* @throws IOException - Test will fail if any unexpected exception is being thrown
*/
@Test()
@Test
public void testCallbackCalledAtTheExpectedTimeTest() throws IOException {
String deletedStateKey = "deletedKey";
String existingState = "existingState";
String urlDeleteState = STATE_PATH + "/" + deletedStateKey;
String urlExistingState = STATE_PATH + "/" + existingState;
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/" + urlDeleteState)
.respond(200, ResponseBody.create(MediaType.parse("application/json"),
deletedStateKey));
mockInterceptor.addRule()
.delete("http://" + sidecarIp + ":3500/" + urlDeleteState)
.respond(204);
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/" + urlExistingState)
.respond(200, ResponseBody.create(MediaType.parse("application/json"),
serializer.serialize(existingState)));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.empty());
String urlExistingState = "v1.0/state/" + existingState;
String deletedStateKey = "deletedKey";
String urlDeleteState = "v1.0/state/" + deletedStateKey;
when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
HttpRequest request = invocation.getArgument(0);
String url = request.uri().toString();
if (request.method().equals("GET") && url.contains(urlExistingState)) {
MockHttpResponse mockHttpResponse = new MockHttpResponse(serializer.serialize(existingState), HTTP_OK);
return CompletableFuture.completedFuture(mockHttpResponse);
}
if (request.method().equals("DELETE")) {
return CompletableFuture.completedFuture(new MockHttpResponse(HTTP_NO_CONTENT));
}
if (request.method().equals("GET")) {
byte [] content = "{\"errorCode\":\"404\",\"message\":\"State Not Found\"}".getBytes();
return CompletableFuture.completedFuture(new MockHttpResponse(content, HTTP_NOT_FOUND));
}
return CompletableFuture.failedFuture(new RuntimeException("Unexpected call"));
});
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
Mono<DaprHttp.Response> response = daprHttp.invokeApi(
"GET",
urlExistingState.split("/"),
null,
null,
Context.empty()
);
assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class));
Mono<DaprHttp.Response> responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.empty());
Mono<DaprHttp.Response> responseDeleteKey =
daprHttp.invokeApi("DELETE", urlDeleteState.split("/"), null, null, Context.empty());
Mono<DaprHttp.Response> responseDeleted = daprHttp.invokeApi(
"GET",
urlDeleteState.split("/"),
null,
null,
Context.empty()
);
Mono<DaprHttp.Response> responseDeleteKey = daprHttp.invokeApi(
"DELETE",
urlDeleteState.split("/"),
null,
null,
Context.empty()
);
assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class));
mockInterceptor.reset();
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/" + urlDeleteState)
.respond(404, ResponseBody.create(MediaType.parse("application/json"),
"{\"errorCode\":\"404\",\"message\":\"State Not Found\"}"));
try {
responseDeleted.block();
fail("Expected DaprException");
@ -321,5 +549,4 @@ public class DaprHttpTest {
assertEquals(DaprException.class, ex.getClass());
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import javax.net.ssl.SSLSession;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.Optional;
public class MockHttpResponse implements HttpResponse<Object> {
private final byte[] body;
private final int statusCode;
public MockHttpResponse(int statusCode) {
this.body = null;
this.statusCode = statusCode;
}
public MockHttpResponse(byte[] body, int statusCode) {
this.body = body;
this.statusCode = statusCode;
}
@Override
public int statusCode() {
return statusCode;
}
@Override
public HttpRequest request() {
return null;
}
@Override
public Optional<HttpResponse<Object>> previousResponse() {
return Optional.empty();
}
@Override
public HttpHeaders headers() {
return HttpHeaders.of(Collections.emptyMap(), (a, b) -> true);
}
@Override
public byte[] body() {
return body;
}
@Override
public Optional<SSLSession> sslSession() {
return Optional.empty();
}
@Override
public URI uri() {
return null;
}
@Override
public HttpClient.Version version() {
return null;
}
}