mirror of https://github.com/dapr/java-sdk.git
add health check to wait for sidecar and test it (#918)
* add health check to wait for sidecar and test it Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * split long line into 2 Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * addRule to failing waitForSidecar test Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * update the http retry on healthcheck and add to tests Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * add success test since failure scenarios are covered Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * update the grpc logic to call the http endpoint Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * add endpoint for grpc to be successful Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * up timeout and make return more similar to http Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * up time for test again Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * added comment on getState not being implemented Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * add daprhttp to grpc to use and overload constructor Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * shorten time in grpc test Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * update grpc return to match http check Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * testing if this fixes CI issue on managed channel not closing properly Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * close daprHttp in teardown func Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * Fix telemetry test that uses GrpcChannelFacade. Signed-off-by: Artur Souza <asouza.pro@gmail.com> * close daprHttp Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * triggering CI again Signed-off-by: Cassandra Coyle <cassie@diagrid.io> --------- Signed-off-by: Cassandra Coyle <cassie@diagrid.io> Signed-off-by: Cassie Coyle <cassie@diagrid.io> Signed-off-by: Artur Souza <asouza.pro@gmail.com> Co-authored-by: Artur Souza <artursouza.ms@outlook.com> Co-authored-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
parent
0c7828ee55
commit
49ccb31dcc
34
README.md
34
README.md
|
@ -37,6 +37,7 @@ This is the Dapr SDK for Java, including the following features:
|
||||||
* An existing Java Maven or Gradle project. You may also start a new project via one of the options below:
|
* An existing Java Maven or Gradle project. You may also start a new project via one of the options below:
|
||||||
* [New Maven project in IntelliJ](https://www.jetbrains.com/help/idea/maven-support.html#create_new_maven_project)
|
* [New Maven project in IntelliJ](https://www.jetbrains.com/help/idea/maven-support.html#create_new_maven_project)
|
||||||
* [Maven in 5 minutes](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html)
|
* [Maven in 5 minutes](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html)
|
||||||
|
* [Install toxiproxy-server binary](https://github.com/Shopify/toxiproxy/releases)
|
||||||
|
|
||||||
### Install JDK
|
### Install JDK
|
||||||
|
|
||||||
|
@ -119,7 +120,7 @@ Please, refer to our [Javadoc](https://dapr.github.io/java-sdk/) website.
|
||||||
|
|
||||||
### Reactor API
|
### Reactor API
|
||||||
|
|
||||||
The Java SDK for Dapr is built using [Project Reactor](https://projectreactor.io/). It provides an asynchronous API for Java. When consuming a result is consumed synchronously, as in the examples referenced above, the `block()` method is used.
|
The Java SDK for Dapr is built using [Project Reactor](https://projectreactor.io/). It provides an asynchronous API for Java. A result is consumed synchronously by using the `block()` method, as shown in the examples referenced above.
|
||||||
|
|
||||||
The code below does not make any API call, it simply returns the [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) publisher object. Nothing happens until the application subscribes or blocks on the result:
|
The code below does not make any API call, it simply returns the [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) publisher object. Nothing happens until the application subscribes or blocks on the result:
|
||||||
|
|
||||||
|
@ -127,7 +128,7 @@ The code below does not make any API call, it simply returns the [Mono](https://
|
||||||
Mono<Void> result = daprClient.publishEvent("mytopic", "my message");
|
Mono<Void> result = daprClient.publishEvent("mytopic", "my message");
|
||||||
```
|
```
|
||||||
|
|
||||||
To start execution and receive the result object synchronously(`void` or `Void` becomes an empty result), use `block()`. The code below shows how to execute the call and consume an empty response:
|
To start execution and receive the result object synchronously (`void` or `Void` becomes an empty result), use `block()`. The code below shows how to execute the call and consume an empty response:
|
||||||
```java
|
```java
|
||||||
Mono<Void> result = daprClient.publishEvent("mytopic", "my message");
|
Mono<Void> result = daprClient.publishEvent("mytopic", "my message");
|
||||||
result.block();
|
result.block();
|
||||||
|
@ -135,9 +136,9 @@ result.block();
|
||||||
|
|
||||||
### How to use a custom serializer
|
### How to use a custom serializer
|
||||||
|
|
||||||
This SDK provides a basic serialization for request/response objects but also for state objects. Applications should provide their own serialization for production scenarios.
|
This SDK provides a basic serialization for request/response objects, and state objects. Applications should provide their own serialization for production scenarios.
|
||||||
|
|
||||||
1. Implement the [DaprObjectSerializer](https://dapr.github.io/java-sdk/io/dapr/serializer/DaprObjectSerializer.html) interface. See [this class](sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java) as example.
|
1. Implement the [DaprObjectSerializer](https://dapr.github.io/java-sdk/io/dapr/serializer/DaprObjectSerializer.html) interface. See [this class](sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java) as an example.
|
||||||
2. Use your serializer class in the following scenarios:
|
2. Use your serializer class in the following scenarios:
|
||||||
* When building a new instance of [DaprClient](https://dapr.github.io/java-sdk/io/dapr/client/DaprClient.html):
|
* When building a new instance of [DaprClient](https://dapr.github.io/java-sdk/io/dapr/client/DaprClient.html):
|
||||||
```java
|
```java
|
||||||
|
@ -163,13 +164,13 @@ This SDK provides a basic serialization for request/response objects but also fo
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
### Debug Java application or Dapr's Java SDK
|
### Debug a Java application or Dapr's Java SDK
|
||||||
|
|
||||||
**In IntelliJ Community Edition, consider [debugging in IntelliJ](https://docs.dapr.io/developing-applications/ides/intellij/).**
|
**In IntelliJ Community Edition, consider [debugging in IntelliJ](https://docs.dapr.io/developing-applications/local-development/ides/intellij/).**
|
||||||
|
|
||||||
**In Visual Studio Code, consider [debugging in Visual Studio Code](https://docs.dapr.io/developing-applications/ides/vscode-debugging/).**
|
**In Visual Studio Code, consider [debugging in Visual Studio Code](https://docs.dapr.io/developing-applications/local-development/ides/vscode/).**
|
||||||
|
|
||||||
If you need to debug your Application, run Dapr sidecar separately and then start the application from your IDE (IntelliJ, for example).
|
If you need to debug your Application, run the Dapr sidecar separately, and then start the application from your IDE (IntelliJ or Eclipse, for example).
|
||||||
For Linux and MacOS:
|
For Linux and MacOS:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
|
@ -178,23 +179,24 @@ dapr run --app-id testapp --app-port 3000 --dapr-http-port 3500 --dapr-grpc-port
|
||||||
|
|
||||||
> Note: confirm the correct port that the app will listen to and that the Dapr ports above are free, changing the ports if necessary.
|
> Note: confirm the correct port that the app will listen to and that the Dapr ports above are free, changing the ports if necessary.
|
||||||
|
|
||||||
When running your Java application from IDE, make sure the following environment variables are set, so the Java SDK knows how to connect to Dapr's sidecar:
|
When running your Java application from your IDE, make sure the following environment variables are set, so the Java SDK knows how to connect to Dapr's sidecar:
|
||||||
```
|
```
|
||||||
DAPR_HTTP_PORT=3500
|
DAPR_HTTP_PORT=3500
|
||||||
DAPR_GRPC_PORT=5001
|
DAPR_GRPC_PORT=5001
|
||||||
```
|
```
|
||||||
|
|
||||||
Now you can go to your IDE (like Eclipse, for example) and debug your Java application, using port `3500` to call Dapr while also listening to port `3000` to expose Dapr's callback endpoint.
|
Now you can go to your IDE and debug your Java application, using port `3500` to call Dapr while also listening to port `3000` to expose Dapr's callback endpoint.
|
||||||
|
|
||||||
### Exception handling
|
### Exception handling
|
||||||
|
|
||||||
Most exceptions thrown from the SDK are instances of `DaprException`. `DaprException` extends from `RuntimeException`, making it compatible with Project Reactor. See [example](./examples/src/main/java/io/dapr/examples/exception) for more details.
|
Most exceptions thrown from the SDK are instances of `DaprException`. `DaprException` extends from `RuntimeException`, making it compatible with Project Reactor. See the [exception example](./examples/src/main/java/io/dapr/examples/exception) for more details.
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
### Update URL to fetch proto files
|
### Update URL to fetch proto files
|
||||||
|
|
||||||
Change the `dapr.proto.baseurl` property below in [pom.xml](./pom.xml) to point to the URL for the desired commit hash in Git if you need to target a proto file that is not been merged into master yet.
|
Change the `dapr.proto.baseurl` property below in [pom.xml](./pom.xml) to point to the URL for the desired commit hash in Git if you need to target a proto file that is not been merged into master yet.
|
||||||
|
|
||||||
Note: You may need to run `./mvnw clean` after changing this setting to remove any auto-generated files so that the new proto files get downloaded and compiled.
|
Note: You may need to run `./mvnw clean` after changing this setting to remove any auto-generated files so that the new proto files get downloaded and compiled.
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
|
@ -212,11 +214,10 @@ Note: You may need to run `./mvnw clean` after changing this setting to remove a
|
||||||
</project>
|
</project>
|
||||||
```
|
```
|
||||||
|
|
||||||
### Running Integration Tests
|
### Running Integration Tests (ITs)
|
||||||
|
|
||||||
#### Pre-Requisites for ITs
|
|
||||||
Along with the pre-requisites for [SDK](#pre-requisites) the following are needed.
|
|
||||||
|
|
||||||
|
#### Pre-Requisites
|
||||||
|
* [Pre-Requisites for the SDK](#pre-requisites)
|
||||||
* Docker installed
|
* Docker installed
|
||||||
* [Docker Compose](https://docs.docker.com/compose/install/)
|
* [Docker Compose](https://docs.docker.com/compose/install/)
|
||||||
* [Docker Desktop](https://www.docker.com/products/docker-desktop)
|
* [Docker Desktop](https://www.docker.com/products/docker-desktop)
|
||||||
|
@ -229,8 +230,7 @@ Along with the pre-requisites for [SDK](#pre-requisites) the following are neede
|
||||||
The code for the tests are present inside the project [sdk-tests](./sdk-tests). This module alone can be imported as a separate project in IDEs.
|
The code for the tests are present inside the project [sdk-tests](./sdk-tests). This module alone can be imported as a separate project in IDEs.
|
||||||
This project depends on the rest of the JARs built by the other modules in the repo like [sdk](./sdk), [sdk-springboot](./sdk-springboot) etc.
|
This project depends on the rest of the JARs built by the other modules in the repo like [sdk](./sdk), [sdk-springboot](./sdk-springboot) etc.
|
||||||
|
|
||||||
As a starting point for running Integration Tests, first run `./mvnw clean install` from the root of the repo to build the JARs for the different modules
|
As a starting point for running the Integration Tests, first run `./mvnw clean install` from the root of the repo to build the JARs for the different modules, except the `sdk-tests` module.
|
||||||
except the `sdk-tests` module.
|
|
||||||
|
|
||||||
#### Run all the dependent services spun up during build
|
#### Run all the dependent services spun up during build
|
||||||
|
|
||||||
|
|
|
@ -177,7 +177,7 @@ public class DaprClientBuilder {
|
||||||
*/
|
*/
|
||||||
private DaprClient buildDaprClientGrpc() {
|
private DaprClient buildDaprClientGrpc() {
|
||||||
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel();
|
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel();
|
||||||
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
|
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, this.daprHttpBuilder.build());
|
||||||
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
|
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
|
||||||
return new DaprClientGrpc(
|
return new DaprClientGrpc(
|
||||||
channelFacade,
|
channelFacade,
|
||||||
|
|
|
@ -53,8 +53,10 @@ import io.dapr.utils.NetworkUtils;
|
||||||
import io.dapr.utils.TypeRef;
|
import io.dapr.utils.TypeRef;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.util.retry.Retry;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -139,12 +141,26 @@ public class DaprClientHttp extends AbstractDaprClient {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
|
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
|
||||||
return Mono.fromRunnable(() -> {
|
return Mono.defer(() -> {
|
||||||
try {
|
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"};
|
||||||
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds);
|
int maxRetries = 5;
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
Retry retrySpec = Retry
|
||||||
}
|
.fixedDelay(maxRetries, Duration.ofMillis(500))
|
||||||
|
.doBeforeRetry(retrySignal -> {
|
||||||
|
System.out.println("Retrying component health check...");
|
||||||
|
});
|
||||||
|
|
||||||
|
Mono<DaprHttp.Response> responseMono = this.client.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments,
|
||||||
|
null, "", null, null);
|
||||||
|
|
||||||
|
return responseMono
|
||||||
|
.retryWhen(retrySpec)
|
||||||
|
.timeout(Duration.ofMillis(timeoutInMilliseconds))
|
||||||
|
.onErrorResume(DaprException.class, e ->
|
||||||
|
Mono.error(new RuntimeException(e)))
|
||||||
|
.switchIfEmpty(DaprException.wrapMono(new RuntimeException("Health check timed out")))
|
||||||
|
.then();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,13 +13,18 @@ limitations under the License.
|
||||||
|
|
||||||
package io.dapr.client;
|
package io.dapr.client;
|
||||||
|
|
||||||
|
import io.dapr.config.Properties;
|
||||||
|
import io.dapr.exceptions.DaprException;
|
||||||
import io.dapr.v1.DaprGrpc;
|
import io.dapr.v1.DaprGrpc;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
|
import okhttp3.OkHttpClient;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.util.retry.Retry;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Facade for common operations on gRPC channel.
|
* Facade for common operations on gRPC channel.
|
||||||
|
@ -34,6 +39,11 @@ class GrpcChannelFacade implements Closeable {
|
||||||
*/
|
*/
|
||||||
private final ManagedChannel channel;
|
private final ManagedChannel channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reference to the DaprHttp client.
|
||||||
|
*/
|
||||||
|
private final DaprHttp daprHttp;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
|
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
|
||||||
|
@ -41,36 +51,75 @@ class GrpcChannelFacade implements Closeable {
|
||||||
* @param channel A Managed GRPC channel
|
* @param channel A Managed GRPC channel
|
||||||
* @see DaprClientBuilder
|
* @see DaprClientBuilder
|
||||||
*/
|
*/
|
||||||
GrpcChannelFacade(ManagedChannel channel) {
|
GrpcChannelFacade(ManagedChannel channel, DaprHttp daprHttp) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
this.daprHttp = daprHttp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
if (daprHttp != null) {
|
||||||
|
daprHttp.close();
|
||||||
|
}
|
||||||
|
|
||||||
if (channel != null && !channel.isShutdown()) {
|
if (channel != null && !channel.isShutdown()) {
|
||||||
channel.shutdown();
|
channel.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mono<Void> waitForChannelReady(int timeoutInMilliseconds) {
|
public Mono<Void> waitForChannelReady(int timeoutInMilliseconds) {
|
||||||
return Mono.fromRunnable(() -> {
|
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"};
|
||||||
boolean isReady = false;
|
int maxRetries = 5;
|
||||||
long startTime = System.currentTimeMillis();
|
|
||||||
while (!isReady && System.currentTimeMillis() - startTime < timeoutInMilliseconds) {
|
|
||||||
isReady = this.channel.getState(true) == ConnectivityState.READY;
|
|
||||||
if (!isReady) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(500);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("Waiting for gRPC channel ready interrupted.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isReady) {
|
Retry retrySpec = Retry
|
||||||
throw new RuntimeException("Timeout waiting for gRPC channel to be ready.");
|
.fixedDelay(maxRetries, Duration.ofMillis(500))
|
||||||
}
|
.doBeforeRetry(retrySignal -> {
|
||||||
|
System.out.println("Retrying component health check...");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/*
|
||||||
|
NOTE: (Cassie) Uncomment this once it actually gets implemented:
|
||||||
|
https://github.com/grpc/grpc-java/issues/4359
|
||||||
|
|
||||||
|
int maxChannelStateRetries = 5;
|
||||||
|
|
||||||
|
// Retry logic for checking the channel state
|
||||||
|
Retry channelStateRetrySpec = Retry
|
||||||
|
.fixedDelay(maxChannelStateRetries, Duration.ofMillis(500))
|
||||||
|
.doBeforeRetry(retrySignal -> {
|
||||||
|
System.out.println("Retrying channel state check...");
|
||||||
|
});
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Do the Dapr Http endpoint check to have parity with Dotnet
|
||||||
|
Mono<DaprHttp.Response> responseMono = this.daprHttp.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments,
|
||||||
|
null, "", null, null);
|
||||||
|
|
||||||
|
return responseMono
|
||||||
|
.retryWhen(retrySpec)
|
||||||
|
/*
|
||||||
|
NOTE: (Cassie) Uncomment this once it actually gets implemented:
|
||||||
|
https://github.com/grpc/grpc-java/issues/4359
|
||||||
|
.flatMap(response -> {
|
||||||
|
// Check the status code
|
||||||
|
int statusCode = response.getStatusCode();
|
||||||
|
|
||||||
|
// Check if the channel's state is READY
|
||||||
|
return Mono.defer(() -> {
|
||||||
|
if (this.channel.getState(true) == ConnectivityState.READY) {
|
||||||
|
// Return true if the status code is in the 2xx range
|
||||||
|
if (statusCode >= 200 && statusCode < 300) {
|
||||||
|
return Mono.empty(); // Continue with the flow
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Mono.error(new RuntimeException("Health check failed"));
|
||||||
|
}).retryWhen(channelStateRetrySpec);
|
||||||
|
})
|
||||||
|
*/
|
||||||
|
.timeout(Duration.ofMillis(timeoutInMilliseconds))
|
||||||
|
.onErrorResume(DaprException.class, e ->
|
||||||
|
Mono.error(new RuntimeException(e)))
|
||||||
|
.switchIfEmpty(DaprException.wrapMono(new RuntimeException("Health check timed out")))
|
||||||
|
.then();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.Arguments;
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.util.context.Context;
|
import reactor.util.context.Context;
|
||||||
|
|
||||||
|
@ -138,8 +139,9 @@ null,
|
||||||
// Create a client channel and register for automatic graceful shutdown.
|
// Create a client channel and register for automatic graceful shutdown.
|
||||||
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
|
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
|
||||||
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
|
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
|
||||||
|
DaprHttp daprHTTP = Mockito.mock(DaprHttp.class);
|
||||||
client = new DaprClientGrpc(
|
client = new DaprClientGrpc(
|
||||||
new GrpcChannelFacade(channel), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
new GrpcChannelFacade(channel, daprHTTP), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
|
@ -177,8 +179,10 @@ null,
|
||||||
// Create a client channel and register for automatic graceful shutdown.
|
// Create a client channel and register for automatic graceful shutdown.
|
||||||
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
|
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
|
||||||
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
|
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
|
||||||
|
|
||||||
|
DaprHttp daprHTTP = Mockito.mock(DaprHttp.class);
|
||||||
client = new DaprClientGrpc(
|
client = new DaprClientGrpc(
|
||||||
new GrpcChannelFacade(channel), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
new GrpcChannelFacade(channel, daprHTTP), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|
|
@ -29,9 +29,11 @@ import io.dapr.client.domain.TransactionalStateOperation;
|
||||||
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
|
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
|
||||||
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
|
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
|
||||||
import io.dapr.config.Properties;
|
import io.dapr.config.Properties;
|
||||||
|
import io.dapr.exceptions.DaprError;
|
||||||
import io.dapr.exceptions.DaprException;
|
import io.dapr.exceptions.DaprException;
|
||||||
import io.dapr.serializer.DaprObjectSerializer;
|
import io.dapr.serializer.DaprObjectSerializer;
|
||||||
import io.dapr.utils.TypeRef;
|
import io.dapr.utils.TypeRef;
|
||||||
|
import okhttp3.MediaType;
|
||||||
import okhttp3.OkHttpClient;
|
import okhttp3.OkHttpClient;
|
||||||
import okhttp3.Request;
|
import okhttp3.Request;
|
||||||
import okhttp3.ResponseBody;
|
import okhttp3.ResponseBody;
|
||||||
|
@ -45,11 +47,14 @@ import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
import reactor.test.scheduler.VirtualTimeScheduler;
|
||||||
import reactor.util.context.Context;
|
import reactor.util.context.Context;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
|
@ -58,16 +63,16 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import reactor.util.context.ContextView;
|
import reactor.util.context.ContextView;
|
||||||
|
import uk.org.webcompere.systemstubs.stream.SystemOut;
|
||||||
|
|
||||||
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
|
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
|
||||||
import static io.dapr.utils.TestUtils.findFreePort;
|
import static io.dapr.utils.TestUtils.findFreePort;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class DaprClientHttpTest {
|
public class DaprClientHttpTest {
|
||||||
|
@ -101,16 +106,132 @@ public class DaprClientHttpTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void waitForSidecarTimeout() throws Exception {
|
public void waitForSidecarTimeOutHealthCheck() throws Exception {
|
||||||
|
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
|
||||||
|
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
|
||||||
|
|
||||||
int port = findFreePort();
|
int port = findFreePort();
|
||||||
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
||||||
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
|
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
|
||||||
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
|
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
|
||||||
assertThrows(RuntimeException.class, () -> daprClientHttp.waitForSidecar(1).block());
|
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json")));
|
||||||
|
|
||||||
|
StepVerifier.create(daprClientHttp.waitForSidecar(100))
|
||||||
|
.expectSubscription()
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(200))) // Advance time to trigger the timeout
|
||||||
|
.expectErrorMatches(throwable -> {
|
||||||
|
if (throwable instanceof TimeoutException) {
|
||||||
|
System.out.println("TimeoutException occurred on sidecar health check.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
})
|
||||||
|
.verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitForSidecarBadHealthCheck() throws Exception {
|
||||||
|
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
|
||||||
|
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
|
||||||
|
|
||||||
|
int port = findFreePort();
|
||||||
|
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
||||||
|
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
|
||||||
|
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
|
||||||
|
|
||||||
|
addMockRulesForBadHealthCheck();
|
||||||
|
|
||||||
|
// retry the max allowed retries (5 times)
|
||||||
|
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
|
||||||
|
.expectSubscription()
|
||||||
|
.expectNoEvent(Duration.ofMillis(500)) // Delay for retry
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
|
||||||
|
.expectNoEvent(Duration.ofMillis(500)) // Delay for another retry
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
|
||||||
|
.expectNoEvent(Duration.ofMillis(500)) // Delay for another retry
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
|
||||||
|
.expectNoEvent(Duration.ofMillis(500)) // Delay for another retry
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
|
||||||
|
.expectNoEvent(Duration.ofMillis(500)) // Delay for the final retry
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
|
||||||
|
.expectErrorMatches(throwable -> {
|
||||||
|
if (throwable instanceof RuntimeException) {
|
||||||
|
return "Retries exhausted: 5/5".equals(throwable.getMessage());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
})
|
||||||
|
.verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addMockRulesForBadHealthCheck() {
|
||||||
|
for (int i = 0; i < 6; i++) {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception {
|
||||||
|
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
|
||||||
|
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
|
||||||
|
|
||||||
|
int port = findFreePort();
|
||||||
|
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
||||||
|
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
|
||||||
|
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
|
||||||
|
|
||||||
|
// Simulate a slow response
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json")));
|
||||||
|
|
||||||
|
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
|
||||||
|
.expectSubscription()
|
||||||
|
.expectNoEvent(Duration.ofSeconds(1)) // Delay for retry
|
||||||
|
.then(() -> {
|
||||||
|
// Simulate a successful response
|
||||||
|
mockInterceptor.reset();
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
|
||||||
|
virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(1));
|
||||||
|
})
|
||||||
|
.expectNext()
|
||||||
|
.expectComplete()
|
||||||
|
.verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitForSidecarOK() throws Exception {
|
||||||
|
int port = findFreePort();
|
||||||
|
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
||||||
|
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
|
||||||
|
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
|
||||||
|
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(204);
|
||||||
|
|
||||||
|
StepVerifier.create(daprClientHttp.waitForSidecar(10000))
|
||||||
|
.expectSubscription()
|
||||||
|
.expectComplete()
|
||||||
|
.verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void waitForSidecarTimeoutOK() throws Exception {
|
public void waitForSidecarTimeoutOK() throws Exception {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(204);
|
||||||
try (ServerSocket serverSocket = new ServerSocket(0)) {
|
try (ServerSocket serverSocket = new ServerSocket(0)) {
|
||||||
final int port = serverSocket.getLocalPort();
|
final int port = serverSocket.getLocalPort();
|
||||||
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
|
||||||
|
|
|
@ -258,7 +258,7 @@ public class DaprHttpTest {
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.get("http://127.0.0.1:3500/" + urlDeleteState)
|
.get("http://127.0.0.1:3500/" + urlDeleteState)
|
||||||
.respond(404, ResponseBody.create(MediaType.parse("application/json"),
|
.respond(404, ResponseBody.create(MediaType.parse("application/json"),
|
||||||
"{\"errorCode\":\"404\",\"message\":\"State Not Fuund\"}"));
|
"{\"errorCode\":\"404\",\"message\":\"State Not Found\"}"));
|
||||||
try {
|
try {
|
||||||
responseDeleted.block();
|
responseDeleted.block();
|
||||||
fail("Expected DaprException");
|
fail("Expected DaprException");
|
||||||
|
|
|
@ -13,16 +13,29 @@ limitations under the License.
|
||||||
|
|
||||||
package io.dapr.client;
|
package io.dapr.client;
|
||||||
|
|
||||||
|
import io.dapr.config.Properties;
|
||||||
|
import io.dapr.utils.NetworkUtils;
|
||||||
import io.dapr.v1.DaprGrpc;
|
import io.dapr.v1.DaprGrpc;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.ManagedChannelBuilder;
|
import io.grpc.ManagedChannelBuilder;
|
||||||
import io.grpc.Server;
|
import io.grpc.Server;
|
||||||
import io.grpc.ServerBuilder;
|
import io.grpc.ServerBuilder;
|
||||||
|
import okhttp3.MediaType;
|
||||||
|
import okhttp3.OkHttpClient;
|
||||||
|
import okhttp3.ResponseBody;
|
||||||
|
import okhttp3.mock.Behavior;
|
||||||
|
import okhttp3.mock.MockInterceptor;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
import reactor.test.scheduler.VirtualTimeScheduler;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static io.dapr.utils.TestUtils.findFreePort;
|
import static io.dapr.utils.TestUtils.findFreePort;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
@ -33,6 +46,21 @@ public class GrpcChannelFacadeTest {
|
||||||
|
|
||||||
public static Server server;
|
public static Server server;
|
||||||
|
|
||||||
|
private MockInterceptor mockInterceptor;
|
||||||
|
|
||||||
|
private OkHttpClient okHttpClient;
|
||||||
|
|
||||||
|
private static DaprHttp daprHttp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable the waitForSidecar to allow the gRPC to check the http endpoint for the health check
|
||||||
|
*/
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() {
|
||||||
|
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
|
||||||
|
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void setup() throws IOException {
|
public static void setup() throws IOException {
|
||||||
port = findFreePort();
|
port = findFreePort();
|
||||||
|
@ -45,26 +73,65 @@ public class GrpcChannelFacadeTest {
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void teardown() throws InterruptedException {
|
public static void teardown() throws InterruptedException {
|
||||||
|
if (daprHttp != null) {
|
||||||
|
daprHttp.close();
|
||||||
|
}
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
server.awaitTermination();
|
server.awaitTermination();
|
||||||
}
|
}
|
||||||
|
private void addMockRulesForBadHealthCheck() {
|
||||||
|
for (int i = 0; i < 6; i++) {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
|
||||||
|
}
|
||||||
|
}
|
||||||
@Test
|
@Test
|
||||||
public void waitForSidecarTimeout() throws Exception {
|
public void waitForSidecarTimeoutHealthCheck() throws Exception {
|
||||||
int unusedPort = findFreePort();
|
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
|
||||||
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort)
|
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
|
||||||
.usePlaintext().build();
|
int timeoutInMilliseconds = 1000;
|
||||||
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
|
|
||||||
|
|
||||||
assertThrows(RuntimeException.class, () -> channelFacade.waitForChannelReady(1).block());
|
int unusedPort = findFreePort();
|
||||||
|
|
||||||
|
OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
|
||||||
|
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient);
|
||||||
|
|
||||||
|
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort)
|
||||||
|
.usePlaintext()
|
||||||
|
.build();
|
||||||
|
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp);
|
||||||
|
|
||||||
|
addMockRulesForBadHealthCheck();
|
||||||
|
|
||||||
|
StepVerifier.create(channelFacade.waitForChannelReady(timeoutInMilliseconds))
|
||||||
|
.expectSubscription()
|
||||||
|
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(timeoutInMilliseconds + timeoutInMilliseconds))) // Advance time to trigger the timeout
|
||||||
|
.expectError(TimeoutException.class)
|
||||||
|
.verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void waitForSidecarOK() {
|
public void waitForSidecarOK() {
|
||||||
|
OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
|
||||||
|
|
||||||
|
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient);
|
||||||
|
|
||||||
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
|
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
|
||||||
.usePlaintext().build();
|
.usePlaintext().build();
|
||||||
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
|
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp);
|
||||||
channelFacade.waitForChannelReady(10000).block();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// added since this is doing a check against the http health check endpoint
|
||||||
|
// for parity with dotnet
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.get()
|
||||||
|
.path("/v1.0/healthz/outbound")
|
||||||
|
.respond(204);
|
||||||
|
|
||||||
|
StepVerifier.create(channelFacade.waitForChannelReady(10000))
|
||||||
|
.expectSubscription()
|
||||||
|
.expectComplete()
|
||||||
|
.verify();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue