From 49ccb31dccd9d28c5a1d7aa89c082bf21b49b9f0 Mon Sep 17 00:00:00 2001 From: Cassie Coyle Date: Mon, 18 Dec 2023 22:31:42 -0600 Subject: [PATCH] add health check to wait for sidecar and test it (#918) * add health check to wait for sidecar and test it Signed-off-by: Cassandra Coyle * split long line into 2 Signed-off-by: Cassandra Coyle * addRule to failing waitForSidecar test Signed-off-by: Cassandra Coyle * update the http retry on healthcheck and add to tests Signed-off-by: Cassandra Coyle * add success test since failure scenarios are covered Signed-off-by: Cassandra Coyle * update the grpc logic to call the http endpoint Signed-off-by: Cassandra Coyle * add endpoint for grpc to be successful Signed-off-by: Cassandra Coyle * up timeout and make return more similar to http Signed-off-by: Cassandra Coyle * up time for test again Signed-off-by: Cassandra Coyle * added comment on getState not being implemented Signed-off-by: Cassandra Coyle * add daprhttp to grpc to use and overload constructor Signed-off-by: Cassandra Coyle * shorten time in grpc test Signed-off-by: Cassandra Coyle * update grpc return to match http check Signed-off-by: Cassandra Coyle * testing if this fixes CI issue on managed channel not closing properly Signed-off-by: Cassandra Coyle * close daprHttp in teardown func Signed-off-by: Cassandra Coyle * Fix telemetry test that uses GrpcChannelFacade. Signed-off-by: Artur Souza * close daprHttp Signed-off-by: Cassandra Coyle * triggering CI again Signed-off-by: Cassandra Coyle --------- Signed-off-by: Cassandra Coyle Signed-off-by: Cassie Coyle Signed-off-by: Artur Souza Co-authored-by: Artur Souza Co-authored-by: Artur Souza --- README.md | 34 ++--- .../io/dapr/client/DaprClientBuilder.java | 2 +- .../java/io/dapr/client/DaprClientHttp.java | 28 +++- .../io/dapr/client/GrpcChannelFacade.java | 87 ++++++++--- .../client/DaprClientGrpcTelemetryTest.java | 8 +- .../io/dapr/client/DaprClientHttpTest.java | 137 +++++++++++++++++- .../java/io/dapr/client/DaprHttpTest.java | 2 +- .../io/dapr/client/GrpcChannelFacadeTest.java | 91 ++++++++++-- 8 files changed, 323 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 90368216c..e7289f896 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ This is the Dapr SDK for Java, including the following features: * An existing Java Maven or Gradle project. You may also start a new project via one of the options below: * [New Maven project in IntelliJ](https://www.jetbrains.com/help/idea/maven-support.html#create_new_maven_project) * [Maven in 5 minutes](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html) + * [Install toxiproxy-server binary](https://github.com/Shopify/toxiproxy/releases) ### Install JDK @@ -119,7 +120,7 @@ Please, refer to our [Javadoc](https://dapr.github.io/java-sdk/) website. ### Reactor API -The Java SDK for Dapr is built using [Project Reactor](https://projectreactor.io/). It provides an asynchronous API for Java. When consuming a result is consumed synchronously, as in the examples referenced above, the `block()` method is used. +The Java SDK for Dapr is built using [Project Reactor](https://projectreactor.io/). It provides an asynchronous API for Java. A result is consumed synchronously by using the `block()` method, as shown in the examples referenced above. The code below does not make any API call, it simply returns the [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) publisher object. Nothing happens until the application subscribes or blocks on the result: @@ -127,7 +128,7 @@ The code below does not make any API call, it simply returns the [Mono](https:// Mono result = daprClient.publishEvent("mytopic", "my message"); ``` -To start execution and receive the result object synchronously(`void` or `Void` becomes an empty result), use `block()`. The code below shows how to execute the call and consume an empty response: +To start execution and receive the result object synchronously (`void` or `Void` becomes an empty result), use `block()`. The code below shows how to execute the call and consume an empty response: ```java Mono result = daprClient.publishEvent("mytopic", "my message"); result.block(); @@ -135,9 +136,9 @@ result.block(); ### How to use a custom serializer -This SDK provides a basic serialization for request/response objects but also for state objects. Applications should provide their own serialization for production scenarios. +This SDK provides a basic serialization for request/response objects, and state objects. Applications should provide their own serialization for production scenarios. -1. Implement the [DaprObjectSerializer](https://dapr.github.io/java-sdk/io/dapr/serializer/DaprObjectSerializer.html) interface. See [this class](sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java) as example. +1. Implement the [DaprObjectSerializer](https://dapr.github.io/java-sdk/io/dapr/serializer/DaprObjectSerializer.html) interface. See [this class](sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java) as an example. 2. Use your serializer class in the following scenarios: * When building a new instance of [DaprClient](https://dapr.github.io/java-sdk/io/dapr/client/DaprClient.html): ```java @@ -163,13 +164,13 @@ This SDK provides a basic serialization for request/response objects but also fo ``` -### Debug Java application or Dapr's Java SDK +### Debug a Java application or Dapr's Java SDK -**In IntelliJ Community Edition, consider [debugging in IntelliJ](https://docs.dapr.io/developing-applications/ides/intellij/).** +**In IntelliJ Community Edition, consider [debugging in IntelliJ](https://docs.dapr.io/developing-applications/local-development/ides/intellij/).** -**In Visual Studio Code, consider [debugging in Visual Studio Code](https://docs.dapr.io/developing-applications/ides/vscode-debugging/).** +**In Visual Studio Code, consider [debugging in Visual Studio Code](https://docs.dapr.io/developing-applications/local-development/ides/vscode/).** -If you need to debug your Application, run Dapr sidecar separately and then start the application from your IDE (IntelliJ, for example). +If you need to debug your Application, run the Dapr sidecar separately, and then start the application from your IDE (IntelliJ or Eclipse, for example). For Linux and MacOS: ```sh @@ -178,23 +179,24 @@ dapr run --app-id testapp --app-port 3000 --dapr-http-port 3500 --dapr-grpc-port > Note: confirm the correct port that the app will listen to and that the Dapr ports above are free, changing the ports if necessary. -When running your Java application from IDE, make sure the following environment variables are set, so the Java SDK knows how to connect to Dapr's sidecar: +When running your Java application from your IDE, make sure the following environment variables are set, so the Java SDK knows how to connect to Dapr's sidecar: ``` DAPR_HTTP_PORT=3500 DAPR_GRPC_PORT=5001 ``` -Now you can go to your IDE (like Eclipse, for example) and debug your Java application, using port `3500` to call Dapr while also listening to port `3000` to expose Dapr's callback endpoint. +Now you can go to your IDE and debug your Java application, using port `3500` to call Dapr while also listening to port `3000` to expose Dapr's callback endpoint. ### Exception handling -Most exceptions thrown from the SDK are instances of `DaprException`. `DaprException` extends from `RuntimeException`, making it compatible with Project Reactor. See [example](./examples/src/main/java/io/dapr/examples/exception) for more details. +Most exceptions thrown from the SDK are instances of `DaprException`. `DaprException` extends from `RuntimeException`, making it compatible with Project Reactor. See the [exception example](./examples/src/main/java/io/dapr/examples/exception) for more details. ## Development ### Update URL to fetch proto files Change the `dapr.proto.baseurl` property below in [pom.xml](./pom.xml) to point to the URL for the desired commit hash in Git if you need to target a proto file that is not been merged into master yet. + Note: You may need to run `./mvnw clean` after changing this setting to remove any auto-generated files so that the new proto files get downloaded and compiled. ```xml @@ -212,11 +214,10 @@ Note: You may need to run `./mvnw clean` after changing this setting to remove a ``` -### Running Integration Tests - -#### Pre-Requisites for ITs -Along with the pre-requisites for [SDK](#pre-requisites) the following are needed. +### Running Integration Tests (ITs) +#### Pre-Requisites +* [Pre-Requisites for the SDK](#pre-requisites) * Docker installed * [Docker Compose](https://docs.docker.com/compose/install/) * [Docker Desktop](https://www.docker.com/products/docker-desktop) @@ -229,8 +230,7 @@ Along with the pre-requisites for [SDK](#pre-requisites) the following are neede The code for the tests are present inside the project [sdk-tests](./sdk-tests). This module alone can be imported as a separate project in IDEs. This project depends on the rest of the JARs built by the other modules in the repo like [sdk](./sdk), [sdk-springboot](./sdk-springboot) etc. -As a starting point for running Integration Tests, first run `./mvnw clean install` from the root of the repo to build the JARs for the different modules -except the `sdk-tests` module. +As a starting point for running the Integration Tests, first run `./mvnw clean install` from the root of the repo to build the JARs for the different modules, except the `sdk-tests` module. #### Run all the dependent services spun up during build diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index d8cafaf3c..dc67d019c 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -177,7 +177,7 @@ public class DaprClientBuilder { */ private DaprClient buildDaprClientGrpc() { final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); - final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); + final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, this.daprHttpBuilder.build()); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); return new DaprClientGrpc( channelFacade, diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index c5c5cc829..f9a01f984 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -53,8 +53,10 @@ import io.dapr.utils.NetworkUtils; import io.dapr.utils.TypeRef; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -139,12 +141,26 @@ public class DaprClientHttp extends AbstractDaprClient { */ @Override public Mono waitForSidecar(int timeoutInMilliseconds) { - return Mono.fromRunnable(() -> { - try { - NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + return Mono.defer(() -> { + String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"}; + int maxRetries = 5; + + Retry retrySpec = Retry + .fixedDelay(maxRetries, Duration.ofMillis(500)) + .doBeforeRetry(retrySignal -> { + System.out.println("Retrying component health check..."); + }); + + Mono responseMono = this.client.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, + null, "", null, null); + + return responseMono + .retryWhen(retrySpec) + .timeout(Duration.ofMillis(timeoutInMilliseconds)) + .onErrorResume(DaprException.class, e -> + Mono.error(new RuntimeException(e))) + .switchIfEmpty(DaprException.wrapMono(new RuntimeException("Health check timed out"))) + .then(); }); } diff --git a/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java b/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java index d7570f6d7..b0e13e033 100644 --- a/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java +++ b/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java @@ -13,13 +13,18 @@ limitations under the License. package io.dapr.client; +import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; import io.dapr.v1.DaprGrpc; import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; +import okhttp3.OkHttpClient; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import java.io.Closeable; import java.io.IOException; +import java.time.Duration; /** * Facade for common operations on gRPC channel. @@ -34,6 +39,11 @@ class GrpcChannelFacade implements Closeable { */ private final ManagedChannel channel; + /** + * The reference to the DaprHttp client. + */ + private final DaprHttp daprHttp; + /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder @@ -41,36 +51,75 @@ class GrpcChannelFacade implements Closeable { * @param channel A Managed GRPC channel * @see DaprClientBuilder */ - GrpcChannelFacade(ManagedChannel channel) { + GrpcChannelFacade(ManagedChannel channel, DaprHttp daprHttp) { this.channel = channel; + this.daprHttp = daprHttp; } @Override public void close() throws IOException { + if (daprHttp != null) { + daprHttp.close(); + } + if (channel != null && !channel.isShutdown()) { channel.shutdown(); } } public Mono waitForChannelReady(int timeoutInMilliseconds) { - return Mono.fromRunnable(() -> { - boolean isReady = false; - long startTime = System.currentTimeMillis(); - while (!isReady && System.currentTimeMillis() - startTime < timeoutInMilliseconds) { - isReady = this.channel.getState(true) == ConnectivityState.READY; - if (!isReady) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Waiting for gRPC channel ready interrupted.", e); - } - } - } + String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"}; + int maxRetries = 5; - if (!isReady) { - throw new RuntimeException("Timeout waiting for gRPC channel to be ready."); - } - }); + Retry retrySpec = Retry + .fixedDelay(maxRetries, Duration.ofMillis(500)) + .doBeforeRetry(retrySignal -> { + System.out.println("Retrying component health check..."); + }); + + /* + NOTE: (Cassie) Uncomment this once it actually gets implemented: + https://github.com/grpc/grpc-java/issues/4359 + + int maxChannelStateRetries = 5; + + // Retry logic for checking the channel state + Retry channelStateRetrySpec = Retry + .fixedDelay(maxChannelStateRetries, Duration.ofMillis(500)) + .doBeforeRetry(retrySignal -> { + System.out.println("Retrying channel state check..."); + }); + */ + + // Do the Dapr Http endpoint check to have parity with Dotnet + Mono responseMono = this.daprHttp.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, + null, "", null, null); + + return responseMono + .retryWhen(retrySpec) + /* + NOTE: (Cassie) Uncomment this once it actually gets implemented: + https://github.com/grpc/grpc-java/issues/4359 + .flatMap(response -> { + // Check the status code + int statusCode = response.getStatusCode(); + + // Check if the channel's state is READY + return Mono.defer(() -> { + if (this.channel.getState(true) == ConnectivityState.READY) { + // Return true if the status code is in the 2xx range + if (statusCode >= 200 && statusCode < 300) { + return Mono.empty(); // Continue with the flow + } + } + return Mono.error(new RuntimeException("Health check failed")); + }).retryWhen(channelStateRetrySpec); + }) + */ + .timeout(Duration.ofMillis(timeoutInMilliseconds)) + .onErrorResume(DaprException.class, e -> + Mono.error(new RuntimeException(e))) + .switchIfEmpty(DaprException.wrapMono(new RuntimeException("Health check timed out"))) + .then(); } } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index 9c835a3b1..f7ec4ccef 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -36,6 +36,7 @@ import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import reactor.core.publisher.Mono; import reactor.util.context.Context; @@ -138,8 +139,9 @@ null, // Create a client channel and register for automatic graceful shutdown. ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); + DaprHttp daprHTTP = Mockito.mock(DaprHttp.class); client = new DaprClientGrpc( - new GrpcChannelFacade(channel), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + new GrpcChannelFacade(channel, daprHTTP), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); } public void setup() throws IOException { @@ -177,8 +179,10 @@ null, // Create a client channel and register for automatic graceful shutdown. ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); + + DaprHttp daprHTTP = Mockito.mock(DaprHttp.class); client = new DaprClientGrpc( - new GrpcChannelFacade(channel), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + new GrpcChannelFacade(channel, daprHTTP), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); } @ParameterizedTest diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 2e93cbc61..3eea3fa15 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -29,9 +29,11 @@ import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.client.domain.UnsubscribeConfigurationRequest; import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.config.Properties; +import io.dapr.exceptions.DaprError; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.TypeRef; +import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.ResponseBody; @@ -45,11 +47,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; import reactor.util.context.Context; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -58,16 +63,16 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + import reactor.util.context.ContextView; +import uk.org.webcompere.systemstubs.stream.SystemOut; import static io.dapr.utils.TestUtils.assertThrowsDaprException; import static io.dapr.utils.TestUtils.findFreePort; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; public class DaprClientHttpTest { @@ -101,16 +106,132 @@ public class DaprClientHttpTest { } @Test - public void waitForSidecarTimeout() throws Exception { + public void waitForSidecarTimeOutHealthCheck() throws Exception { + VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + StepVerifier.setDefaultTimeout(Duration.ofSeconds(20)); + int port = findFreePort(); System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp); - assertThrows(RuntimeException.class, () -> daprClientHttp.waitForSidecar(1).block()); + + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json"))); + + StepVerifier.create(daprClientHttp.waitForSidecar(100)) + .expectSubscription() + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(200))) // Advance time to trigger the timeout + .expectErrorMatches(throwable -> { + if (throwable instanceof TimeoutException) { + System.out.println("TimeoutException occurred on sidecar health check."); + return true; + } + return false; + }) + .verify(); + } + + @Test + public void waitForSidecarBadHealthCheck() throws Exception { + VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + StepVerifier.setDefaultTimeout(Duration.ofSeconds(20)); + + int port = findFreePort(); + System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); + DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp); + + addMockRulesForBadHealthCheck(); + + // retry the max allowed retries (5 times) + StepVerifier.create(daprClientHttp.waitForSidecar(5000)) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(500)) // Delay for retry + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500))) + .expectNoEvent(Duration.ofMillis(500)) // Delay for another retry + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500))) + .expectNoEvent(Duration.ofMillis(500)) // Delay for another retry + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500))) + .expectNoEvent(Duration.ofMillis(500)) // Delay for another retry + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500))) + .expectNoEvent(Duration.ofMillis(500)) // Delay for the final retry + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500))) + .expectErrorMatches(throwable -> { + if (throwable instanceof RuntimeException) { + return "Retries exhausted: 5/5".equals(throwable.getMessage()); + } + return false; + }) + .verify(); + } + + private void addMockRulesForBadHealthCheck() { + for (int i = 0; i < 6; i++) { + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(404, ResponseBody.create("Not Found", MediaType.get("application/json"))); + } + } + @Test + public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception { + VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + StepVerifier.setDefaultTimeout(Duration.ofSeconds(20)); + + int port = findFreePort(); + System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); + DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp); + + // Simulate a slow response + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json"))); + + StepVerifier.create(daprClientHttp.waitForSidecar(5000)) + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(1)) // Delay for retry + .then(() -> { + // Simulate a successful response + mockInterceptor.reset(); + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(204, ResponseBody.create("No Content", MediaType.get("application/json"))); + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(1)); + }) + .expectNext() + .expectComplete() + .verify(); + } + + @Test + public void waitForSidecarOK() throws Exception { + int port = findFreePort(); + System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); + DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp); + + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(204); + + StepVerifier.create(daprClientHttp.waitForSidecar(10000)) + .expectSubscription() + .expectComplete() + .verify(); } @Test public void waitForSidecarTimeoutOK() throws Exception { + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(204); try (ServerSocket serverSocket = new ServerSocket(0)) { final int port = serverSocket.getLocalPort(); System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java index e3e170fe4..a0ca430aa 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java @@ -258,7 +258,7 @@ public class DaprHttpTest { mockInterceptor.addRule() .get("http://127.0.0.1:3500/" + urlDeleteState) .respond(404, ResponseBody.create(MediaType.parse("application/json"), - "{\"errorCode\":\"404\",\"message\":\"State Not Fuund\"}")); + "{\"errorCode\":\"404\",\"message\":\"State Not Found\"}")); try { responseDeleted.block(); fail("Expected DaprException"); diff --git a/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java b/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java index 6eb5ed4b7..1e97b4ebe 100644 --- a/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java +++ b/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java @@ -13,16 +13,29 @@ limitations under the License. package io.dapr.client; +import io.dapr.config.Properties; +import io.dapr.utils.NetworkUtils; import io.dapr.v1.DaprGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.ResponseBody; +import okhttp3.mock.Behavior; +import okhttp3.mock.MockInterceptor; +import org.junit.Before; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeoutException; import static io.dapr.utils.TestUtils.findFreePort; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -33,6 +46,21 @@ public class GrpcChannelFacadeTest { public static Server server; + private MockInterceptor mockInterceptor; + + private OkHttpClient okHttpClient; + + private static DaprHttp daprHttp; + + /** + * Enable the waitForSidecar to allow the gRPC to check the http endpoint for the health check + */ + @BeforeEach + public void setUp() { + mockInterceptor = new MockInterceptor(Behavior.UNORDERED); + okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); + } + @BeforeAll public static void setup() throws IOException { port = findFreePort(); @@ -45,26 +73,65 @@ public class GrpcChannelFacadeTest { @AfterAll public static void teardown() throws InterruptedException { + if (daprHttp != null) { + daprHttp.close(); + } server.shutdown(); server.awaitTermination(); } - + private void addMockRulesForBadHealthCheck() { + for (int i = 0; i < 6; i++) { + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(404, ResponseBody.create("Not Found", MediaType.get("application/json"))); + } + } @Test - public void waitForSidecarTimeout() throws Exception { - int unusedPort = findFreePort(); - ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort) - .usePlaintext().build(); - final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); + public void waitForSidecarTimeoutHealthCheck() throws Exception { + VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + StepVerifier.setDefaultTimeout(Duration.ofSeconds(20)); + int timeoutInMilliseconds = 1000; - assertThrows(RuntimeException.class, () -> channelFacade.waitForChannelReady(1).block()); + int unusedPort = findFreePort(); + + OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); + DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); + + ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort) + .usePlaintext() + .build(); + final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp); + + addMockRulesForBadHealthCheck(); + + StepVerifier.create(channelFacade.waitForChannelReady(timeoutInMilliseconds)) + .expectSubscription() + .then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(timeoutInMilliseconds + timeoutInMilliseconds))) // Advance time to trigger the timeout + .expectError(TimeoutException.class) + .verify(); } @Test public void waitForSidecarOK() { - ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port) - .usePlaintext().build(); - final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); - channelFacade.waitForChannelReady(10000).block(); - } + OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); + DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); + + ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port) + .usePlaintext().build(); + final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp); + + // added since this is doing a check against the http health check endpoint + // for parity with dotnet + mockInterceptor.addRule() + .get() + .path("/v1.0/healthz/outbound") + .respond(204); + + StepVerifier.create(channelFacade.waitForChannelReady(10000)) + .expectSubscription() + .expectComplete() + .verify(); + } }