mirror of https://github.com/dapr/java-sdk.git
Fix waitForSidecar to respect timeout. (#1146)
* Fix waitForSidecar to respect timeout. Signed-off-by: Artur Souza <asouza.pro@gmail.com> * Bring back 500ms interval for retries and log for waitForSidecar Signed-off-by: Artur Souza <asouza.pro@gmail.com> * Fix flaky test for ConfigurationClientIT Signed-off-by: Artur Souza <asouza.pro@gmail.com> --------- Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
parent
b1196d3777
commit
b683fcec55
|
@ -69,6 +69,7 @@ public class ConfigurationClientIT extends BaseIT {
|
||||||
public static void init() throws Exception {
|
public static void init() throws Exception {
|
||||||
daprRun = startDaprApp(ConfigurationClientIT.class.getSimpleName(), 5000);
|
daprRun = startDaprApp(ConfigurationClientIT.class.getSimpleName(), 5000);
|
||||||
daprClient = daprRun.newDaprClientBuilder().build();
|
daprClient = daprRun.newDaprClientBuilder().build();
|
||||||
|
daprClient.waitForSidecar(10000).block();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2024 The Dapr Authors
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.dapr.it.resiliency;
|
||||||
|
|
||||||
|
import io.dapr.it.BaseIT;
|
||||||
|
import io.dapr.it.DaprRun;
|
||||||
|
import io.dapr.it.ToxiProxyRun;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test SDK resiliency.
|
||||||
|
*/
|
||||||
|
public class WaitForSidecarIT extends BaseIT {
|
||||||
|
|
||||||
|
// Use a number large enough to make sure it will respect the entire timeout.
|
||||||
|
private static final Duration LATENCY = Duration.ofSeconds(5);
|
||||||
|
|
||||||
|
private static final Duration JITTER = Duration.ofSeconds(0);
|
||||||
|
|
||||||
|
private static DaprRun daprRun;
|
||||||
|
|
||||||
|
private static ToxiProxyRun toxiProxyRun;
|
||||||
|
|
||||||
|
private static DaprRun daprNotRunning;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void init() throws Exception {
|
||||||
|
daprRun = startDaprApp(WaitForSidecarIT.class.getSimpleName(), 5000);
|
||||||
|
daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName()+"NotRunning", 5000);
|
||||||
|
daprNotRunning.stop();
|
||||||
|
|
||||||
|
toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER);
|
||||||
|
toxiProxyRun.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitSucceeds() throws Exception {
|
||||||
|
try(var client = daprRun.newDaprClient()) {
|
||||||
|
client.waitForSidecar(5000).block();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitTimeout() {
|
||||||
|
int timeoutInMillis = (int)LATENCY.minusMillis(100).toMillis();
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
assertThrows(RuntimeException.class, () -> {
|
||||||
|
try(var client = toxiProxyRun.newDaprClientBuilder().build()) {
|
||||||
|
client.waitForSidecar(timeoutInMillis).block();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
long duration = System.currentTimeMillis() - started;
|
||||||
|
assertTrue(duration >= timeoutInMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitSlow() throws Exception {
|
||||||
|
int timeoutInMillis = (int)LATENCY.plusMillis(100).toMillis();
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
try(var client = toxiProxyRun.newDaprClientBuilder().build()) {
|
||||||
|
client.waitForSidecar(timeoutInMillis).block();
|
||||||
|
}
|
||||||
|
long duration = System.currentTimeMillis() - started;
|
||||||
|
assertTrue(duration >= LATENCY.toMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitNotRunningTimeout() {
|
||||||
|
// Does not make this number too smaller since bug does not repro when <= 2.5s.
|
||||||
|
// This has to do with a previous bug in the implementation.
|
||||||
|
int timeoutMilliseconds = 5000;
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
assertThrows(RuntimeException.class, () -> {
|
||||||
|
try(var client = daprNotRunning.newDaprClientBuilder().build()) {
|
||||||
|
client.waitForSidecar(timeoutMilliseconds).block();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
long duration = System.currentTimeMillis() - started;
|
||||||
|
assertTrue(duration >= timeoutMilliseconds);
|
||||||
|
}
|
||||||
|
}
|
|
@ -81,6 +81,8 @@ import io.grpc.Metadata;
|
||||||
import io.grpc.stub.AbstractStub;
|
import io.grpc.stub.AbstractStub;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.FluxSink;
|
import reactor.core.publisher.FluxSink;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
@ -97,7 +99,6 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -114,6 +115,8 @@ import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;
|
||||||
*/
|
*/
|
||||||
public class DaprClientImpl extends AbstractDaprClient {
|
public class DaprClientImpl extends AbstractDaprClient {
|
||||||
|
|
||||||
|
private final Logger logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The GRPC managed channel to be used.
|
* The GRPC managed channel to be used.
|
||||||
*/
|
*/
|
||||||
|
@ -235,6 +238,7 @@ public class DaprClientImpl extends AbstractDaprClient {
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
this.retryPolicy = retryPolicy;
|
this.retryPolicy = retryPolicy;
|
||||||
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy);
|
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy);
|
||||||
|
this.logger = LoggerFactory.getLogger(DaprClientImpl.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
|
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
|
||||||
|
@ -273,53 +277,21 @@ public class DaprClientImpl extends AbstractDaprClient {
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
|
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
|
||||||
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"};
|
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"};
|
||||||
int maxRetries = 5;
|
|
||||||
|
|
||||||
Retry retrySpec = Retry
|
|
||||||
.fixedDelay(maxRetries, Duration.ofMillis(500))
|
|
||||||
.doBeforeRetry(retrySignal -> {
|
|
||||||
System.out.println("Retrying component health check...");
|
|
||||||
});
|
|
||||||
|
|
||||||
/*
|
|
||||||
NOTE: (Cassie) Uncomment this once it actually gets implemented:
|
|
||||||
https://github.com/grpc/grpc-java/issues/4359
|
|
||||||
|
|
||||||
int maxChannelStateRetries = 5;
|
|
||||||
|
|
||||||
// Retry logic for checking the channel state
|
|
||||||
Retry channelStateRetrySpec = Retry
|
|
||||||
.fixedDelay(maxChannelStateRetries, Duration.ofMillis(500))
|
|
||||||
.doBeforeRetry(retrySignal -> {
|
|
||||||
System.out.println("Retrying channel state check...");
|
|
||||||
});
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Do the Dapr Http endpoint check to have parity with Dotnet
|
// Do the Dapr Http endpoint check to have parity with Dotnet
|
||||||
Mono<DaprHttp.Response> responseMono = this.httpClient.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments,
|
Mono<DaprHttp.Response> responseMono = this.httpClient.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments,
|
||||||
null, "", null, null);
|
null, "", null, null);
|
||||||
|
|
||||||
return responseMono
|
return responseMono
|
||||||
.retryWhen(retrySpec)
|
// No method to "retry forever every 500ms", so we make it practically forever.
|
||||||
/*
|
// 9223372036854775807 * 500 ms = 1.46235604 x 10^11 years
|
||||||
NOTE: (Cassie) Uncomment this once it actually gets implemented:
|
// If anyone needs to wait for the sidecar for longer than that, sorry.
|
||||||
https://github.com/grpc/grpc-java/issues/4359
|
.retryWhen(
|
||||||
.flatMap(response -> {
|
Retry
|
||||||
// Check the status code
|
.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(500))
|
||||||
int statusCode = response.getStatusCode();
|
.doBeforeRetry(s -> {
|
||||||
|
this.logger.info("Retrying sidecar health check ...");
|
||||||
// Check if the channel's state is READY
|
}))
|
||||||
return Mono.defer(() -> {
|
|
||||||
if (this.channel.getState(true) == ConnectivityState.READY) {
|
|
||||||
// Return true if the status code is in the 2xx range
|
|
||||||
if (statusCode >= 200 && statusCode < 300) {
|
|
||||||
return Mono.empty(); // Continue with the flow
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Mono.error(new RuntimeException("Health check failed"));
|
|
||||||
}).retryWhen(channelStateRetrySpec);
|
|
||||||
})
|
|
||||||
*/
|
|
||||||
.timeout(Duration.ofMillis(timeoutInMilliseconds))
|
.timeout(Duration.ofMillis(timeoutInMilliseconds))
|
||||||
.onErrorResume(DaprException.class, e ->
|
.onErrorResume(DaprException.class, e ->
|
||||||
Mono.error(new RuntimeException(e)))
|
Mono.error(new RuntimeException(e)))
|
||||||
|
|
|
@ -133,16 +133,11 @@ public class DaprClientHttpTest {
|
||||||
.times(6)
|
.times(6)
|
||||||
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
|
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
|
||||||
|
|
||||||
// retry the max allowed retries (5 times)
|
// it will timeout.
|
||||||
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
|
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
|
||||||
.expectSubscription()
|
.expectSubscription()
|
||||||
.expectErrorMatches(throwable -> {
|
.expectError()
|
||||||
if (throwable instanceof RuntimeException) {
|
.verify(Duration.ofMillis(6000));
|
||||||
return "Retries exhausted: 5/5".equals(throwable.getMessage());
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
})
|
|
||||||
.verify(Duration.ofSeconds(20));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue