Fix unit tests hanging (#1006)

* Fix DaprClientHttpTest and GrpcChannelFacadeTest

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add timeout to build workflow.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Improve RetryPolicyTest.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix dependencies for gRPC mocking.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-02-08 13:55:22 -08:00 committed by GitHub
parent a9a09ba2ac
commit 22427c9659
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 108 additions and 97 deletions

View File

@ -17,6 +17,7 @@ jobs:
build:
name: "Build jdk:${{ matrix.java }} sb:${{ matrix.spring-boot-version }} exp:${{ matrix.experimental }}"
runs-on: ubuntu-latest
timeout-minutes: 30
continue-on-error: ${{ matrix.experimental }}
strategy:
fail-fast: false
@ -148,6 +149,7 @@ jobs:
publish:
runs-on: ubuntu-latest
needs: build
timeout-minutes: 10
env:
JDK_VER: 17
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}

20
pom.xml
View File

@ -37,7 +37,7 @@
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED</argLine>
<failsafe.version>3.2.2</failsafe.version>
<surefire.version>3.2.2</surefire.version>
<junit-bom.version>5.7.2</junit-bom.version>
<junit-bom.version>5.8.2</junit-bom.version>
</properties>
<distributionManagement>
@ -88,6 +88,24 @@
<version>3.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.5.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.56.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
<version>1.59.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -128,6 +128,21 @@
<artifactId>junit-jupiter-migrationsupport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -107,22 +107,17 @@ public class DaprClientHttpTest {
@Test
public void waitForSidecarTimeOutHealthCheck() throws Exception {
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json")));
.delay(200)
.respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
StepVerifier.create(daprClientHttp.waitForSidecar(100))
.expectSubscription()
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(200))) // Advance time to trigger the timeout
.expectErrorMatches(throwable -> {
if (throwable instanceof TimeoutException) {
System.out.println("TimeoutException occurred on sidecar health check.");
@ -130,56 +125,36 @@ public class DaprClientHttpTest {
}
return false;
})
.verify();
.verify(Duration.ofSeconds(20));
}
@Test
public void waitForSidecarBadHealthCheck() throws Exception {
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
addMockRulesForBadHealthCheck();
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.times(6)
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
// retry the max allowed retries (5 times)
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(500)) // Delay for retry
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
.expectNoEvent(Duration.ofMillis(500)) // Delay for another retry
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
.expectNoEvent(Duration.ofMillis(500)) // Delay for another retry
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
.expectNoEvent(Duration.ofMillis(500)) // Delay for another retry
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
.expectNoEvent(Duration.ofMillis(500)) // Delay for the final retry
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500)))
.expectErrorMatches(throwable -> {
if (throwable instanceof RuntimeException) {
return "Retries exhausted: 5/5".equals(throwable.getMessage());
}
return false;
})
.verify();
.verify(Duration.ofSeconds(20));
}
private void addMockRulesForBadHealthCheck() {
for (int i = 0; i < 6; i++) {
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
}
}
@Test
public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception {
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
@ -189,23 +164,22 @@ public class DaprClientHttpTest {
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.delay(1000)
.times(2)
.respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json")));
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.delay(1000)
.times(1)
.respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1)) // Delay for retry
.then(() -> {
// Simulate a successful response
mockInterceptor.reset();
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.respond(204, ResponseBody.create("No Content", MediaType.get("application/json")));
virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(1));
})
.expectNext()
.expectComplete()
.verify();
.verify(Duration.ofSeconds(20));
}
@Test

View File

@ -20,6 +20,8 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.ResponseBody;
@ -79,37 +81,25 @@ public class GrpcChannelFacadeTest {
server.shutdown();
server.awaitTermination();
}
private void addMockRulesForBadHealthCheck() {
for (int i = 0; i < 6; i++) {
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
}
}
@Test
public void waitForSidecarTimeoutHealthCheck() throws Exception {
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
StepVerifier.setDefaultTimeout(Duration.ofSeconds(20));
int timeoutInMilliseconds = 1000;
int unusedPort = findFreePort();
OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient);
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort)
.usePlaintext()
.build();
ManagedChannel channel = InProcessChannelBuilder.forName("waitForSidecarTimeoutHealthCheck").build();
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp);
addMockRulesForBadHealthCheck();
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.times(6)
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
StepVerifier.create(channelFacade.waitForChannelReady(timeoutInMilliseconds))
StepVerifier.create(channelFacade.waitForChannelReady(1000))
.expectSubscription()
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(timeoutInMilliseconds + timeoutInMilliseconds))) // Advance time to trigger the timeout
.expectError(TimeoutException.class)
.verify();
.verify(Duration.ofSeconds(20));
}
@Test

View File

@ -19,14 +19,11 @@ import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class RetryPolicyTest {
@ -41,12 +38,10 @@ public class RetryPolicyTest {
RetryPolicy policy = new RetryPolicy(0);
Mono<String> action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION);
try {
policy.apply(action).block();
fail("Exception expected");
} catch (Exception e) {
assertSame(RETRYABLE_EXCEPTION, e);
}
StepVerifier
.create(policy.apply(action))
.expectError(StatusRuntimeException.class)
.verify();
assertEquals(1, callCounter.get());
}
@ -56,8 +51,25 @@ public class RetryPolicyTest {
RetryPolicy policy = new RetryPolicy(0);
Mono<String> action = createActionErrorAndReturn(callCounter, 0, RETRYABLE_EXCEPTION);
String response = policy.apply(action).block();
assertEquals(SUCCESS_MESSAGE, response);
StepVerifier
.create(policy.apply(action))
.expectNext(SUCCESS_MESSAGE)
.expectComplete()
.verify();
assertEquals(1, callCounter.get());
}
@Test
public void singleRetryPolicyWithSuccess() {
AtomicInteger callCounter = new AtomicInteger();
RetryPolicy policy = new RetryPolicy(1);
Mono<String> action = createActionErrorAndReturn(callCounter, 0, RETRYABLE_EXCEPTION);
StepVerifier
.create(policy.apply(action))
.expectNext(SUCCESS_MESSAGE)
.expectComplete()
.verify();
assertEquals(1, callCounter.get());
}
@ -67,8 +79,11 @@ public class RetryPolicyTest {
RetryPolicy policy = new RetryPolicy(3);
Mono<String> action = createActionErrorAndReturn(callCounter, 2, RETRYABLE_EXCEPTION);
String response = policy.apply(action).block();
assertEquals(SUCCESS_MESSAGE, response);
StepVerifier
.create(policy.apply(action))
.expectNext(SUCCESS_MESSAGE)
.expectComplete()
.verify();
assertEquals(3, callCounter.get());
}
@ -78,12 +93,11 @@ public class RetryPolicyTest {
RetryPolicy policy = new RetryPolicy(3);
Mono<String> action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION);
try {
policy.apply(action).block();
fail("Exception expected");
} catch (Exception e) {
assertTrue(Exceptions.isRetryExhausted(e));
}
StepVerifier
.create(policy.apply(action))
.expectErrorMatches(e -> Exceptions.isRetryExhausted(e))
.verify();
assertEquals(4, callCounter.get());
}
@ -94,24 +108,22 @@ public class RetryPolicyTest {
RetryPolicy policy = new RetryPolicy(3);
Mono<String> action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, exception);
assertThrows(ArithmeticException.class, () -> {
policy.apply(action).block();
});
StepVerifier
.create(policy.apply(action))
.expectError(ArithmeticException.class)
.verify();
assertEquals(1, callCounter.get());
}
private static Mono<String> createActionErrorAndReturn(
AtomicInteger callCounter,
int firstErrors,
RuntimeException error) {
return Mono.fromCallable(() -> {
return Mono.fromRunnable(() -> {
if (callCounter.incrementAndGet() <= firstErrors) {
throw error;
}
return SUCCESS_MESSAGE;
});
}).thenReturn(SUCCESS_MESSAGE);
}
}