headerValue, int defaultStatusCode) {
+ if (headerValue.isEmpty()) {
return defaultStatusCode;
}
// Metadata used to override status code with code received from HTTP binding.
try {
- int httpStatusCode = Integer.parseInt(headerValue);
+ int httpStatusCode = Integer.parseInt(headerValue.get());
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return httpStatusCode;
}
diff --git a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java
index 8ef163dd9..2de7fe631 100644
--- a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java
+++ b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java
@@ -14,15 +14,13 @@ limitations under the License.
package io.dapr.client;
import io.dapr.config.Properties;
-import okhttp3.ConnectionPool;
-import okhttp3.Dispatcher;
-import okhttp3.OkHttpClient;
+import java.net.http.HttpClient;
import java.time.Duration;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import static io.dapr.config.Properties.API_TOKEN;
-import static io.dapr.config.Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_REQUESTS;
import static io.dapr.config.Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS;
import static io.dapr.config.Properties.HTTP_ENDPOINT;
@@ -34,24 +32,13 @@ import static io.dapr.config.Properties.SIDECAR_IP;
*/
public class DaprHttpBuilder {
- /**
- * Singleton OkHttpClient.
- */
- private static volatile OkHttpClient OK_HTTP_CLIENT;
+ private static volatile HttpClient HTTP_CLIENT;
/**
* Static lock object.
*/
private static final Object LOCK = new Object();
- /**
- * HTTP keep alive duration in seconds.
- *
- * Just hard code to a reasonable value.
- */
- private static final int KEEP_ALIVE_DURATION = 30;
-
-
/**
* Build an instance of the Http client based on the provided setup.
* @param properties to configure the DaprHttp client
@@ -68,38 +55,30 @@ public class DaprHttpBuilder {
* @return Instance of {@link DaprHttp}
*/
private DaprHttp buildDaprHttp(Properties properties) {
- if (OK_HTTP_CLIENT == null) {
+ if (HTTP_CLIENT == null) {
synchronized (LOCK) {
- if (OK_HTTP_CLIENT == null) {
- OkHttpClient.Builder builder = new OkHttpClient.Builder();
- Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS));
- builder.readTimeout(readTimeout);
-
- Dispatcher dispatcher = new Dispatcher();
- dispatcher.setMaxRequests(properties.getValue(HTTP_CLIENT_MAX_REQUESTS));
- // The maximum number of requests for each host to execute concurrently.
- // Default value is 5 in okhttp which is totally UNACCEPTABLE!
- // For sidecar case, set it the same as maxRequests.
- dispatcher.setMaxRequestsPerHost(HTTP_CLIENT_MAX_REQUESTS.get());
- builder.dispatcher(dispatcher);
-
- ConnectionPool pool = new ConnectionPool(properties.getValue(HTTP_CLIENT_MAX_IDLE_CONNECTIONS),
- KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
- builder.connectionPool(pool);
-
- OK_HTTP_CLIENT = builder.build();
+ if (HTTP_CLIENT == null) {
+ int maxRequests = properties.getValue(HTTP_CLIENT_MAX_REQUESTS);
+ Executor executor = Executors.newFixedThreadPool(maxRequests);
+ HTTP_CLIENT = HttpClient.newBuilder()
+ .executor(executor)
+ .version(HttpClient.Version.HTTP_1_1)
+ .build();
}
}
}
String endpoint = properties.getValue(HTTP_ENDPOINT);
+ String apiToken = properties.getValue(API_TOKEN);
+ Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS));
+
if ((endpoint != null) && !endpoint.isEmpty()) {
- return new DaprHttp(endpoint, properties.getValue(API_TOKEN), OK_HTTP_CLIENT);
+ return new DaprHttp(endpoint, apiToken, readTimeout, HTTP_CLIENT);
}
- return new DaprHttp(properties.getValue(SIDECAR_IP), properties.getValue(HTTP_PORT), properties.getValue(API_TOKEN),
- OK_HTTP_CLIENT);
-
+ String sidecarIp = properties.getValue(SIDECAR_IP);
+ int port = properties.getValue(HTTP_PORT);
+ return new DaprHttp(sidecarIp, port, apiToken, readTimeout, HTTP_CLIENT);
}
}
diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java
index 9d1a2a503..53e89e845 100644
--- a/sdk/src/main/java/io/dapr/client/Subscription.java
+++ b/sdk/src/main/java/io/dapr/client/Subscription.java
@@ -19,9 +19,10 @@ import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
-import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
+import javax.annotation.Nonnull;
+
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -153,7 +154,7 @@ public class Subscription implements Closeable {
}).onErrorReturn(SubscriptionListener.Status.RETRY);
}
- @NotNull
+ @Nonnull
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
String id, SubscriptionListener.Status status) {
DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =
diff --git a/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java b/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java
index 5b7d546e7..7f763c05a 100644
--- a/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java
+++ b/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java
@@ -14,12 +14,12 @@ limitations under the License.
package io.dapr.client.domain;
import io.dapr.client.DaprHttp;
-import okhttp3.HttpUrl;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/**
* HTTP Extension class.
@@ -67,17 +67,17 @@ public final class HttpExtension {
/**
* HTTP verb.
*/
- private DaprHttp.HttpMethods method;
+ private final DaprHttp.HttpMethods method;
/**
* HTTP query params.
*/
- private Map> queryParams;
+ private final Map> queryParams;
/**
* HTTP headers.
*/
- private Map headers;
+ private final Map headers;
/**
* Construct a HttpExtension object.
@@ -126,18 +126,29 @@ public final class HttpExtension {
* @return Encoded HTTP query string.
*/
public String encodeQueryString() {
- if ((this.queryParams == null) || (this.queryParams.isEmpty())) {
+ if (queryParams == null || queryParams.isEmpty()) {
return "";
}
- HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
- // Setting required values but we only need query params in the end.
- urlBuilder.scheme("http").host("localhost");
- Optional.ofNullable(this.queryParams).orElse(Collections.emptyMap()).entrySet().stream()
- .forEach(urlParameter ->
- Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
- .forEach(urlParameterValue ->
- urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
- return urlBuilder.build().encodedQuery();
+ StringBuilder queryBuilder = new StringBuilder();
+
+ for (Map.Entry> entry : queryParams.entrySet()) {
+ String key = entry.getKey();
+ List values = entry.getValue();
+
+ for (String value : values) {
+ if (queryBuilder.length() > 0) {
+ queryBuilder.append("&");
+ }
+
+ queryBuilder.append(encodeQueryParam(key, value)); // Encode key and value
+ }
+ }
+
+ return queryBuilder.toString();
+ }
+
+ private static String encodeQueryParam(String key, String value) {
+ return URLEncoder.encode(key, StandardCharsets.UTF_8) + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
}
}
diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
index 44e36d06d..6864a1ee0 100644
--- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
@@ -13,24 +13,16 @@ limitations under the License.
package io.dapr.client;
import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
-import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
import io.dapr.v1.DaprGrpc;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.ResponseBody;
-import okhttp3.mock.Behavior;
-import okhttp3.mock.MediaTypes;
-import okhttp3.mock.MockInterceptor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -40,13 +32,15 @@ import reactor.util.context.ContextView;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static io.dapr.utils.TestUtils.findFreePort;
@@ -57,12 +51,19 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DaprClientHttpTest {
private final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
+
+ private static final int HTTP_NO_CONTENT = 204;
+ private static final int HTTP_NOT_FOUND = 404;
+ private static final int HTTP_SERVER_ERROR = 500;
+ private static final int HTTP_OK = 200;
+ private static final Duration READ_TIMEOUT = Duration.ofSeconds(60);
private String sidecarIp;
@@ -72,17 +73,14 @@ public class DaprClientHttpTest {
private DaprHttp daprHttp;
- private OkHttpClient okHttpClient;
-
- private MockInterceptor mockInterceptor;
+ private HttpClient httpClient;
@BeforeEach
public void setUp() {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
daprApiToken = Properties.API_TOKEN.get();
- mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
- okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
- daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, okHttpClient);
+ httpClient = mock(HttpClient.class);
+ daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, READ_TIMEOUT, httpClient);
daprClientHttp = buildDaprClient(daprHttp);
}
@@ -100,14 +98,16 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarTimeOutHealthCheck() throws Exception {
- daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, okHttpClient);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
- mockInterceptor.addRule()
- .get()
- .path("/v1.0/healthz/outbound")
- .delay(200)
- .respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
+ when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
+ Thread.sleep(200);
+
+ return mockResponse;
+ });
StepVerifier.create(daprClientHttp.waitForSidecar(100))
.expectSubscription()
@@ -123,15 +123,20 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarBadHealthCheck() throws Exception {
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NOT_FOUND);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
int port = findFreePort();
- daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient);
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
+ AtomicInteger count = new AtomicInteger(0);
- mockInterceptor.addRule()
- .get()
- .path("/v1.0/healthz/outbound")
- .times(6)
- .respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
+ when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
+ if (count.getAndIncrement() < 6) {
+ return mockResponse;
+ }
+
+ return CompletableFuture.failedFuture(new TimeoutException());
+ });
// it will timeout.
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
@@ -143,24 +148,25 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception {
int port = findFreePort();
- daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient);
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
+ AtomicInteger count = new AtomicInteger(0);
+
+ when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
+ if (count.getAndIncrement() < 2) {
+ Thread.sleep(1000);
+
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR);
+ return CompletableFuture.>completedFuture(mockHttpResponse);
+ }
+
+ Thread.sleep(1000);
+
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
+ return CompletableFuture.>completedFuture(mockHttpResponse);
+ });
// Simulate a slow response
- mockInterceptor.addRule()
- .get()
- .path("/v1.0/healthz/outbound")
- .delay(1000)
- .times(2)
- .respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json")));
-
- mockInterceptor.addRule()
- .get()
- .path("/v1.0/healthz/outbound")
- .delay(1000)
- .times(1)
- .respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
-
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
.expectSubscription()
.expectNext()
@@ -170,14 +176,13 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarOK() throws Exception {
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
int port = findFreePort();
- daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient);
+ daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
- mockInterceptor.addRule()
- .get()
- .path("/v1.0/healthz/outbound")
- .respond(204);
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
StepVerifier.create(daprClientHttp.waitForSidecar(10000))
.expectSubscription()
@@ -187,12 +192,14 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarTimeoutOK() throws Exception {
- mockInterceptor.addRule()
- .get()
- .path("/v1.0/healthz/outbound")
- .respond(204);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
try (ServerSocket serverSocket = new ServerSocket(0)) {
- final int port = serverSocket.getLocalPort();
+ int port = serverSocket.getLocalPort();
+
Thread t = new Thread(() -> {
try {
try (Socket socket = serverSocket.accept()) {
@@ -201,7 +208,8 @@ public class DaprClientHttpTest {
}
});
t.start();
- daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient);
+
+ daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, READ_TIMEOUT, httpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp);
daprClientHttp.waitForSidecar(10000).block();
}
@@ -209,80 +217,146 @@ public class DaprClientHttpTest {
@Test
public void invokeServiceVerbNull() {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/publish/A")
- .respond(EXPECTED_RESULT);
- String event = "{ \"message\": \"This is a test\" }";
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(EXPECTED_RESULT.getBytes(), HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
assertThrows(IllegalArgumentException.class, () ->
- daprClientHttp.invokeMethod(null, "", "", null, null, (Class)null).block());
+ daprClientHttp.invokeMethod(
+ null,
+ "",
+ "",
+ null,
+ null,
+ (Class)null
+ ).block());
}
@Test
public void invokeServiceIllegalArgumentException() {
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/badorder")
- .respond("INVALID JSON");
+ byte[] content = "INVALID JSON".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
assertThrows(IllegalArgumentException.class, () -> {
// null HttpMethod
- daprClientHttp.invokeMethod("1", "2", "3", new HttpExtension(null), null, (Class)null).block();
+ daprClientHttp.invokeMethod(
+ "1",
+ "2",
+ "3",
+ new HttpExtension(null),
+ null,
+ (Class)null
+ ).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null HttpExtension
- daprClientHttp.invokeMethod("1", "2", "3", null, null, (Class)null).block();
+ daprClientHttp.invokeMethod(
+ "1",
+ "2",
+ "3",
+ null,
+ null,
+ (Class)null
+ ).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// empty appId
- daprClientHttp.invokeMethod("", "1", null, HttpExtension.GET, null, (Class)null).block();
+ daprClientHttp.invokeMethod(
+ "",
+ "1",
+ null,
+ HttpExtension.GET,
+ null,
+ (Class)null
+ ).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null appId, empty method
- daprClientHttp.invokeMethod(null, "", null, HttpExtension.POST, null, (Class)null).block();
+ daprClientHttp.invokeMethod(
+ null,
+ "",
+ null,
+ HttpExtension.POST,
+ null,
+ (Class)null
+ ).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// empty method
- daprClientHttp.invokeMethod("1", "", null, HttpExtension.PUT, null, (Class)null).block();
+ daprClientHttp.invokeMethod(
+ "1",
+ "",
+ null,
+ HttpExtension.PUT,
+ null,
+ (Class)null
+ ).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null method
- daprClientHttp.invokeMethod("1", null, null, HttpExtension.DELETE, null, (Class)null).block();
+ daprClientHttp.invokeMethod(
+ "1",
+ null,
+ null,
+ HttpExtension.DELETE,
+ null,
+ (Class)null
+ ).block();
});
assertThrowsDaprException(JsonParseException.class, () -> {
// invalid JSON response
- daprClientHttp.invokeMethod("41", "badorder", null, HttpExtension.GET, null, String.class).block();
+ daprClientHttp.invokeMethod(
+ "41",
+ "badorder",
+ null,
+ HttpExtension.GET,
+ null,
+ String.class
+ ).block();
});
}
@Test
public void invokeServiceDaprError() {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
- .respond(500,
- ResponseBody.create(
- "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}",
- MediaTypes.MEDIATYPE_JSON));
+ byte[] content = "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
- daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
+ daprClientHttp.invokeMethod(
+ "myapp",
+ "mymethod",
+ "anything",
+ HttpExtension.POST
+ ).block();
});
assertEquals("MYCODE", exception.getErrorCode());
assertEquals("MYCODE: My Message (HTTP status code: 500)", exception.getMessage());
- assertEquals(500, exception.getHttpStatusCode());
+ assertEquals(HTTP_SERVER_ERROR, exception.getHttpStatusCode());
}
@Test
public void invokeServiceDaprErrorFromGRPC() {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
- .respond(500,
- ResponseBody.create(
- "{ \"code\": 7 }",
- MediaTypes.MEDIATYPE_JSON));
+ byte[] content = "{ \"code\": 7 }".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
- daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
+ daprClientHttp.invokeMethod(
+ "myapp",
+ "mymethod",
+ "anything",
+ HttpExtension.POST
+ ).block();
});
assertEquals("PERMISSION_DENIED", exception.getErrorCode());
@@ -291,12 +365,11 @@ public class DaprClientHttpTest {
@Test
public void invokeServiceDaprErrorUnknownJSON() {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
- .respond(500,
- ResponseBody.create(
- "{ \"anything\": 7 }",
- MediaTypes.MEDIATYPE_JSON));
+ byte[] content = "{ \"anything\": 7 }".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
@@ -309,119 +382,203 @@ public class DaprClientHttpTest {
@Test
public void invokeServiceDaprErrorEmptyString() {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod")
- .respond(500,
- ResponseBody.create(
- "",
- MediaTypes.MEDIATYPE_JSON));
+ byte[] content = "".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
DaprException exception = assertThrows(DaprException.class, () -> {
- daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
+ daprClientHttp.invokeMethod(
+ "myapp",
+ "mymethod",
+ "anything",
+ HttpExtension.POST
+ ).block();
});
assertEquals("UNKNOWN", exception.getErrorCode());
assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage());
}
-
@Test
public void invokeServiceMethodNull() {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/publish/A")
- .respond(EXPECTED_RESULT);
+ byte[] content = EXPECTED_RESULT.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
assertThrows(IllegalArgumentException.class, () ->
- daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block());
+ daprClientHttp.invokeMethod(
+ "1",
+ "",
+ null,
+ HttpExtension.POST,
+ null,
+ (Class)null
+ ).block());
}
@Test
public void invokeService() {
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond("\"hello world\"");
+ byte[] content = "\"hello world\"".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ null,
+ HttpExtension.GET,
+ null,
+ String.class
+ );
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class);
assertEquals("hello world", mono.block());
}
@Test
public void invokeServiceNullResponse() {
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond(new byte[0]);
+ byte[] content = new byte[0];
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ null,
+ HttpExtension.GET,
+ null,
+ String.class
+ );
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class);
assertNull(mono.block());
}
@Test
public void simpleInvokeService() {
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond(EXPECTED_RESULT);
+ byte[] content = EXPECTED_RESULT.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ null,
+ HttpExtension.GET,
+ byte[].class
+ );
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, byte[].class);
assertEquals(new String(mono.block()), EXPECTED_RESULT);
}
@Test
public void invokeServiceWithMetadataMap() {
- Map map = new HashMap<>();
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond(EXPECTED_RESULT);
+ Map map = Map.of();
+ byte[] content = EXPECTED_RESULT.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", (byte[]) null, HttpExtension.GET, map);
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ (byte[]) null,
+ HttpExtension.GET,
+ map
+ );
String monoString = new String(mono.block());
+
assertEquals(monoString, EXPECTED_RESULT);
}
@Test
public void invokeServiceWithOutRequest() {
- Map map = new HashMap<>();
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond(EXPECTED_RESULT);
+ Map map = Map.of();
+ byte[] content = EXPECTED_RESULT.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ HttpExtension.GET,
+ map
+ );
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", HttpExtension.GET, map);
assertNull(mono.block());
}
@Test
public void invokeServiceWithRequest() {
- Map map = new HashMap<>();
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond(EXPECTED_RESULT);
+ Map map = Map.of();
+ byte[] content = EXPECTED_RESULT.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ "",
+ HttpExtension.GET,
+ map
+ );
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map);
assertNull(mono.block());
}
@Test
public void invokeServiceWithRequestAndQueryString() {
- Map map = new HashMap<>();
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder?param1=1¶m2=a¶m2=b%2Fc")
- .respond(EXPECTED_RESULT);
+ Map map = Map.of();
+ Map> queryString = Map.of(
+ "param1", List.of("1"),
+ "param2", List.of("a", "b/c")
+ );
+ byte[] content = EXPECTED_RESULT.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
- Map> queryString = new HashMap<>();
- queryString.put("param1", Collections.singletonList("1"));
- queryString.put("param2", Arrays.asList("a", "b/c"));
HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString, null);
- Mono mono = daprClientHttp.invokeMethod("41", "neworder", "", httpExtension, map);
+ Mono mono = daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ "",
+ httpExtension,
+ map
+ );
+
assertNull(mono.block());
}
@Test
public void invokeServiceNoHotMono() {
- Map map = new HashMap<>();
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .respond(500);
+ Map map = Map.of();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
- daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map);
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ daprClientHttp.invokeMethod(
+ "41",
+ "neworder",
+ "",
+ HttpExtension.GET,
+ map
+ );
// No exception should be thrown because did not call block() on mono above.
}
@@ -433,18 +590,27 @@ public class DaprClientHttpTest {
.put("traceparent", traceparent)
.put("tracestate", tracestate)
.put("not_added", "xyz");
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder")
- .header("traceparent", traceparent)
- .header("tracestate", tracestate)
- .respond(new byte[0]);
+ byte[] content = new byte[0];
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder")
.setBody("request")
.setHttpExtension(HttpExtension.POST);
Mono result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class))
.contextWrite(it -> it.putAll((ContextView) context));
+
result.block();
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
+ assertEquals(traceparent, request.headers().firstValue("traceparent").get());
+ assertEquals(tracestate, request.headers().firstValue("tracestate").get());
}
@Test
@@ -467,23 +633,4 @@ public class DaprClientHttpTest {
daprClientHttp.close();
}
- private static class XmlSerializer implements DaprObjectSerializer {
-
- private static final XmlMapper XML_MAPPER = new XmlMapper();
-
- @Override
- public byte[] serialize(Object o) throws IOException {
- return XML_MAPPER.writeValueAsBytes(o);
- }
-
- @Override
- public T deserialize(byte[] data, TypeRef type) throws IOException {
- return XML_MAPPER.readValue(data, new TypeReference() {});
- }
-
- @Override
- public String getContentType() {
- return "application/xml";
- }
- }
-}
\ No newline at end of file
+}
diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java
index 78470719c..85863c046 100644
--- a/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java
@@ -14,10 +14,10 @@ limitations under the License.
package io.dapr.client;
import io.dapr.config.Properties;
-import okhttp3.OkHttpClient;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
+import java.net.http.HttpClient;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -30,14 +30,13 @@ public class DaprHttpBuilderTest {
DaprHttp daprHttp = new DaprHttpBuilder().build(properties);
DaprHttp anotherDaprHttp = new DaprHttpBuilder().build(properties);
- assertSame(getOkHttpClient(daprHttp), getOkHttpClient(anotherDaprHttp));
+ assertSame(getHttpClient(daprHttp), getHttpClient(anotherDaprHttp));
}
-
- private static OkHttpClient getOkHttpClient(DaprHttp daprHttp) throws Exception {
+ private static HttpClient getHttpClient(DaprHttp daprHttp) throws Exception {
Field httpClientField = DaprHttp.class.getDeclaredField("httpClient");
httpClientField.setAccessible(true);
- OkHttpClient okHttpClient = (OkHttpClient) httpClientField.get(daprHttp);
+ HttpClient okHttpClient = (HttpClient) httpClientField.get(daprHttp);
assertNotNull(okHttpClient);
return okHttpClient;
}
diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java
index 2b38d7809..17d9205c2 100644
--- a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java
+++ b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java
@@ -16,6 +16,7 @@ package io.dapr.client;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -25,6 +26,8 @@ import java.util.Map;
*/
public class DaprHttpStub extends DaprHttp {
+ private static final Duration READ_TIMEOUT = Duration.ofSeconds(60);
+
public static class ResponseStub extends DaprHttp.Response {
public ResponseStub(byte[] body, Map headers, int statusCode) {
super(body, headers, statusCode);
@@ -34,7 +37,7 @@ public class DaprHttpStub extends DaprHttp {
* Instantiates a stub for DaprHttp
*/
public DaprHttpStub() {
- super(null, 3000, "stubToken", null);
+ super(null, 3000, "stubToken", READ_TIMEOUT, null);
}
/**
diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java
index bf21e6419..ad6753479 100644
--- a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java
@@ -16,14 +16,10 @@ import io.dapr.config.Properties;
import io.dapr.exceptions.DaprErrorDetails;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.TypeRef;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.ResponseBody;
-import okhttp3.mock.Behavior;
-import okhttp3.mock.MockInterceptor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.Context;
@@ -32,214 +28,407 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.util.Collections;
-import java.util.HashMap;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static io.dapr.utils.TestUtils.formatIpAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@ExtendWith(SystemStubsExtension.class)
public class DaprHttpTest {
- @SystemStub
- public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
-
- private static final String STATE_PATH = DaprHttp.API_VERSION + "/state";
-
+ private static final int HTTP_OK = 200;
+ private static final int HTTP_SERVER_ERROR = 500;
+ private static final int HTTP_NO_CONTENT = 204;
+ private static final int HTTP_NOT_FOUND = 404;
private static final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
-
+ private static final Duration READ_TIMEOUT = Duration.ofSeconds(60);
+
+ @SystemStub
+ private final EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
private String sidecarIp;
private String daprTokenApi;
- private OkHttpClient okHttpClient;
+ private HttpClient httpClient;
- private MockInterceptor mockInterceptor;
-
- private ObjectSerializer serializer = new ObjectSerializer();
+ private final ObjectSerializer serializer = new ObjectSerializer();
@BeforeEach
public void setUp() {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
daprTokenApi = Properties.API_TOKEN.get();
- mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
- okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
+ httpClient = mock(HttpClient.class);
}
@Test
public void invokeApi_daprApiToken_present() throws IOException {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .hasHeader(Headers.DAPR_API_TOKEN)
- .respond(serializer.serialize(EXPECTED_RESULT));
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
environmentVariables.set(Properties.API_TOKEN.getEnvName(), "xyz");
assertEquals("xyz", Properties.API_TOKEN.get());
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), okHttpClient);
- Mono mono =
- daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty());
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ (byte[]) null,
+ null,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("POST", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
+ assertEquals("xyz", request.headers().firstValue(Headers.DAPR_API_TOKEN).get());
}
@Test
public void invokeApi_daprApiToken_absent() throws IOException {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .not()
- .hasHeader(Headers.DAPR_API_TOKEN)
- .respond(serializer.serialize(EXPECTED_RESULT));
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
assertNull(Properties.API_TOKEN.get());
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty());
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ (byte[]) null,
+ null,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("POST", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
+ assertFalse(request.headers().map().containsKey(Headers.DAPR_API_TOKEN));
}
@Test
public void invokeMethod() throws IOException {
- Map headers = new HashMap<>();
- headers.put("content-type", "text/html");
- headers.put("header1", "value1");
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(serializer.serialize(EXPECTED_RESULT));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty());
+ Map headers = Map.of(
+ "content-type", "text/html",
+ "header1", "value1"
+ );
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ (byte[]) null,
+ headers,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("POST", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
+ assertEquals("text/html", request.headers().firstValue("content-type").get());
+ assertEquals("value1", request.headers().firstValue("header1").get());
}
@Test
public void invokeMethodIPv6() throws IOException {
sidecarIp = formatIpAddress("2001:db8:3333:4444:5555:6666:7777:8888");
- Map headers = new HashMap<>();
- headers.put("content-type", "text/html");
- headers.put("header1", "value1");
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(serializer.serialize(EXPECTED_RESULT));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty());
+ Map headers = Map.of(
+ "content-type", "text/html",
+ "header1", "value1"
+ );
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ (byte[]) null,
+ headers,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("POST", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
+ assertEquals("text/html", request.headers().firstValue("content-type").get());
+ assertEquals("value1", request.headers().firstValue("header1").get());
}
@Test
public void invokePostMethod() throws IOException {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(serializer.serialize(EXPECTED_RESULT))
- .addHeader("Header", "Value");
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.empty());
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ "",
+ null,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("POST", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
}
@Test
public void invokeDeleteMethod() throws IOException {
- mockInterceptor.addRule()
- .delete("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(serializer.serialize(EXPECTED_RESULT));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.empty());
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "DELETE",
+ "v1.0/state".split("/"),
+ null,
+ (String) null,
+ null,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("DELETE", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
}
@Test
- public void invokeHEADMethod() throws IOException {
- mockInterceptor.addRule().head("http://127.0.0.1:3500/v1.0/state").respond(HttpURLConnection.HTTP_OK);
- DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("HEAD", "v1.0/state".split("/"), null, (String) null, null, Context.empty());
+ public void invokeHeadMethod() {
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "HEAD",
+ "v1.0/state".split("/"),
+ null,
+ (String) null,
+ null,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
- assertEquals(HttpURLConnection.HTTP_OK, response.getStatusCode());
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
+ assertEquals("HEAD", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
+ assertEquals(HTTP_OK, response.getStatusCode());
}
-
+
@Test
public void invokeGetMethod() throws IOException {
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3500/v1.0/get")
- .respond(serializer.serialize(EXPECTED_RESULT));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.empty());
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "GET",
+ "v1.0/state".split("/"),
+ null,
+ null,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("GET", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString());
}
@Test
public void invokeMethodWithHeaders() throws IOException {
- Map headers = new HashMap<>();
- headers.put("header", "value");
- headers.put("header1", "value1");
- Map> urlParameters = new HashMap<>();
- urlParameters.put("orderId", Collections.singletonList("41"));
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41")
- .respond(serializer.serialize(EXPECTED_RESULT));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.empty());
+ Map headers = Map.of(
+ "header", "value",
+ "header1", "value1"
+ );
+ Map> urlParameters = Map.of(
+ "orderId", List.of("41")
+ );
+ byte[] content = serializer.serialize(EXPECTED_RESULT);
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "GET",
+ "v1.0/state/order".split("/"),
+ urlParameters,
+ headers,
+ Context.empty()
+ );
DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class);
+
+ verify(httpClient).sendAsync(requestCaptor.capture(), any());
+
+ HttpRequest request = requestCaptor.getValue();
+
assertEquals(EXPECTED_RESULT, body);
+ assertEquals("GET", request.method());
+ assertEquals("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41", request.uri().toString());
+ assertEquals("value", request.headers().firstValue("header").get());
+ assertEquals("value1", request.headers().firstValue("header1").get());
}
@Test
- public void invokePostMethodRuntime() throws IOException {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(500);
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono =
- daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
+ public void invokePostMethodRuntime() {
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ null,
+ Context.empty());
+
StepVerifier.create(mono).expectError(RuntimeException.class).verify();
}
@Test
- public void invokePostDaprError() throws IOException {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(500, ResponseBody.create(MediaType.parse("text"),
- "{\"errorCode\":null,\"message\":null}"));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
+ public void invokePostDaprError() {
+ byte[] content = "{\"errorCode\":null,\"message\":null}".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ null,
+ Context.empty()
+ );
+
StepVerifier.create(mono).expectError(RuntimeException.class).verify();
}
@Test
- public void invokePostMethodUnknownError() throws IOException {
- mockInterceptor.addRule()
- .post("http://" + sidecarIp + ":3500/v1.0/state")
- .respond(500, ResponseBody.create(MediaType.parse("application/json"),
- "{\"errorCode\":\"null\",\"message\":\"null\"}"));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
+ public void invokePostMethodUnknownError() {
+ byte[] content = "{\"errorCode\":null,\"message\":null}".getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/state".split("/"),
+ null,
+ null,
+ Context.empty()
+ );
+
StepVerifier.create(mono).expectError(RuntimeException.class).verify();
}
@Test
public void validateExceptionParsing() {
- final String payload = "{" +
+ String payload = "{" +
"\"errorCode\":\"ERR_PUBSUB_NOT_FOUND\"," +
"\"message\":\"pubsub abc is not found\"," +
"\"details\":[" +
@@ -249,14 +438,24 @@ public class DaprHttpTest {
"\"metadata\":{}," +
"\"reason\":\"DAPR_PUBSUB_NOT_FOUND\"" +
"}]}";
- mockInterceptor.addRule()
- .post("http://127.0.0.1:3500/v1.0/pubsub/publish")
- .respond(500, ResponseBody.create(MediaType.parse("application/json"),
- payload));
- DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient);
- Mono mono = daprHttp.invokeApi("POST", "v1.0/pubsub/publish".split("/"), null, null, Context.empty());
+ byte[] content = payload.getBytes();
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR);
+ CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse);
+
+ when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse);
+
+ DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono mono = daprHttp.invokeApi(
+ "POST",
+ "v1.0/pubsub/publish".split("/"),
+ null,
+ null,
+ Context.empty()
+ );
+
StepVerifier.create(mono).expectErrorMatches(e -> {
assertEquals(DaprException.class, e.getClass());
+
DaprException daprException = (DaprException)e;
assertEquals("ERR_PUBSUB_NOT_FOUND", daprException.getErrorCode());
assertEquals("DAPR_PUBSUB_NOT_FOUND",
@@ -267,15 +466,15 @@ public class DaprHttpTest {
}
/**
- * The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR
+ * The purpose of this test is to show that it doesn't matter when the client is called, the actual call to DAPR
* will be done when the output Mono response call the Mono.block method.
- * Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state
- * you just retrived but block for the delete response, when later you block for the response of the getState, you will
- * not found the state.
+ * Like for instance if you call getState, without blocking for the response, and then call delete for the same state
+ * you just retrieved but block for the delete response, when later you block for the response of the getState, you will
+ * not find the state.
* This test will execute the following flow:
*
- * - Exeucte client getState for Key=key1
- * - Block for result to the the state
+ * - Execute client getState for Key=key1
+ * - Block for result to the state
* - Assert the Returned State is the expected to key1
* - Execute client getState for Key=key2
* - Execute client deleteState for Key=key2
@@ -285,35 +484,64 @@ public class DaprHttpTest {
*
* @throws IOException - Test will fail if any unexpected exception is being thrown
*/
- @Test()
+ @Test
public void testCallbackCalledAtTheExpectedTimeTest() throws IOException {
- String deletedStateKey = "deletedKey";
String existingState = "existingState";
- String urlDeleteState = STATE_PATH + "/" + deletedStateKey;
- String urlExistingState = STATE_PATH + "/" + existingState;
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3500/" + urlDeleteState)
- .respond(200, ResponseBody.create(MediaType.parse("application/json"),
- deletedStateKey));
- mockInterceptor.addRule()
- .delete("http://" + sidecarIp + ":3500/" + urlDeleteState)
- .respond(204);
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3500/" + urlExistingState)
- .respond(200, ResponseBody.create(MediaType.parse("application/json"),
- serializer.serialize(existingState)));
- DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
- Mono response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.empty());
+ String urlExistingState = "v1.0/state/" + existingState;
+ String deletedStateKey = "deletedKey";
+ String urlDeleteState = "v1.0/state/" + deletedStateKey;
+
+ when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
+ HttpRequest request = invocation.getArgument(0);
+ String url = request.uri().toString();
+
+ if (request.method().equals("GET") && url.contains(urlExistingState)) {
+ MockHttpResponse mockHttpResponse = new MockHttpResponse(serializer.serialize(existingState), HTTP_OK);
+
+ return CompletableFuture.completedFuture(mockHttpResponse);
+ }
+
+ if (request.method().equals("DELETE")) {
+ return CompletableFuture.completedFuture(new MockHttpResponse(HTTP_NO_CONTENT));
+ }
+
+ if (request.method().equals("GET")) {
+ byte [] content = "{\"errorCode\":\"404\",\"message\":\"State Not Found\"}".getBytes();
+
+ return CompletableFuture.completedFuture(new MockHttpResponse(content, HTTP_NOT_FOUND));
+ }
+
+ return CompletableFuture.failedFuture(new RuntimeException("Unexpected call"));
+ });
+
+ DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient);
+ Mono response = daprHttp.invokeApi(
+ "GET",
+ urlExistingState.split("/"),
+ null,
+ null,
+ Context.empty()
+ );
+
assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class));
- Mono responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.empty());
- Mono responseDeleteKey =
- daprHttp.invokeApi("DELETE", urlDeleteState.split("/"), null, null, Context.empty());
+
+ Mono responseDeleted = daprHttp.invokeApi(
+ "GET",
+ urlDeleteState.split("/"),
+ null,
+ null,
+ Context.empty()
+ );
+ Mono responseDeleteKey = daprHttp.invokeApi(
+ "DELETE",
+ urlDeleteState.split("/"),
+ null,
+ null,
+ Context.empty()
+ );
+
assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class));
- mockInterceptor.reset();
- mockInterceptor.addRule()
- .get("http://" + sidecarIp + ":3500/" + urlDeleteState)
- .respond(404, ResponseBody.create(MediaType.parse("application/json"),
- "{\"errorCode\":\"404\",\"message\":\"State Not Found\"}"));
+
try {
responseDeleted.block();
fail("Expected DaprException");
@@ -321,5 +549,4 @@ public class DaprHttpTest {
assertEquals(DaprException.class, ex.getClass());
}
}
-
}
diff --git a/sdk/src/test/java/io/dapr/client/MockHttpResponse.java b/sdk/src/test/java/io/dapr/client/MockHttpResponse.java
new file mode 100644
index 000000000..b5f0510a0
--- /dev/null
+++ b/sdk/src/test/java/io/dapr/client/MockHttpResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2025 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.client;
+
+import javax.net.ssl.SSLSession;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collections;
+import java.util.Optional;
+
+public class MockHttpResponse implements HttpResponse