Implement retry and timeout policy for gRPC client. (#889)

* Implement retry and timeout policy for gRPC client.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix invoke actor after aborted flow.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2023-08-15 22:39:16 -07:00 committed by GitHub
parent 6d659913b1
commit cf8040dd6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1023 additions and 116 deletions

View File

@ -48,6 +48,7 @@ jobs:
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
@ -101,14 +102,20 @@ jobs:
docker stop dapr_placement
cd dapr
./dist/linux_amd64/release/placement &
- name: Install Local kafka using docker-compose
- name: Install local Kafka using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d
docker ps
- name: Install Local mongo database using docker-compose
- name: Install local Mongo database using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-mongo.yml up -d
docker ps
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
run: |
mkdir -p /home/runner/.local/bin
wget -q ${{ env.TOXIPROXY_URL }} -O /home/runner/.local/bin/toxiproxy-server
chmod +x /home/runner/.local/bin/toxiproxy-server
/home/runner/.local/bin/toxiproxy-server --version
- name: Clean up files
run: mvn clean -B
- name: Build sdk

View File

@ -15,6 +15,7 @@ package io.dapr.actors.client;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
@ -46,26 +47,41 @@ public class ActorClient implements AutoCloseable {
* Instantiates a new channel for Dapr sidecar communication.
*/
public ActorClient() {
this(Properties.API_PROTOCOL.get());
this(null);
}
/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(ResiliencyOptions resiliencyOptions) {
this(Properties.API_PROTOCOL.get(), resiliencyOptions);
}
/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol) {
this(apiProtocol, buildManagedChannel(apiProtocol));
private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) {
this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions);
}
/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param grpcManagedChannel gRPC channel.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) {
private ActorClient(
DaprApiProtocol apiProtocol,
ManagedChannel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel);
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions);
}
/**
@ -119,9 +135,12 @@ public class ActorClient implements AutoCloseable {
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
private static DaprClient buildDaprClient(
DaprApiProtocol apiProtocol,
Channel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
switch (apiProtocol) {
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
case HTTP: {
LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10.");
return new DaprHttpClient(new DaprHttpBuilder().build());

View File

@ -14,9 +14,12 @@ limitations under the License.
package io.dapr.actors.client;
import com.google.protobuf.ByteString;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
@ -39,18 +42,33 @@ import java.util.function.Consumer;
*/
class DaprGrpcClient implements DaprClient {
/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;
/**
* Retry policy for SDK calls to Dapr API.
*/
private final RetryPolicy retryPolicy;
/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub client;
private final DaprGrpc.DaprStub client;
/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param resiliencyOptions Client resiliency options (optional)
*/
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
this.client = intercept(grpcClient);
this.timeoutPolicy = new TimeoutPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
}
/**
@ -78,14 +96,14 @@ class DaprGrpcClient implements DaprClient {
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
@ -114,7 +132,8 @@ class DaprGrpcClient implements DaprClient {
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {

View File

@ -20,6 +20,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Represents the base class for actors.
@ -28,8 +29,6 @@ import java.util.UUID;
*/
public abstract class AbstractActor {
private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer();
/**
* Type of tracing messages.
*/
@ -58,7 +57,7 @@ public abstract class AbstractActor {
/**
* Internal control to assert method invocation on start and finish in this SDK.
*/
private boolean started;
private final AtomicBoolean started;
/**
* Instantiates a new Actor.
@ -74,7 +73,7 @@ public abstract class AbstractActor {
runtimeContext.getActorTypeInformation().getName(),
id);
this.actorTrace = runtimeContext.getActorTrace();
this.started = false;
this.started = new AtomicBoolean(false);
}
/**
@ -250,14 +249,16 @@ public abstract class AbstractActor {
/**
* Resets the cached state of this Actor.
*
* @param force Forces the rollback, even if not in a call.
*/
void rollback() {
if (!this.started) {
void rollback(boolean force) {
if (!force && !this.started.get()) {
throw new IllegalStateException("Cannot reset state before starting call.");
}
this.resetState();
this.started = false;
this.started.set(false);
}
/**
@ -302,11 +303,12 @@ public abstract class AbstractActor {
*/
Mono<Void> onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
return Mono.fromRunnable(() -> {
if (this.started) {
throw new IllegalStateException("Cannot invoke a method before completing previous call.");
if (this.started.get()) {
throw new IllegalStateException(
"Cannot invoke a method before completing previous call. " + getId().toString());
}
this.started = true;
this.started.set(true);
}).then(this.onPreActorMethod(actorMethodContext));
}
@ -318,14 +320,13 @@ public abstract class AbstractActor {
*/
Mono<Void> onPostActorMethodInternal(ActorMethodContext actorMethodContext) {
return Mono.fromRunnable(() -> {
if (!this.started) {
if (!this.started.get()) {
throw new IllegalStateException("Cannot complete a method before starting a call.");
}
}).then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> {
this.started = false;
}));
})
.then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> this.started.set(false)));
}
/**

View File

@ -306,15 +306,16 @@ class ActorManager<T extends AbstractActor> {
this.runtimeContext.getActorTypeInformation().getName()));
}
return actor.onPreActorMethodInternal(context)
return Mono.fromRunnable(() -> actor.rollback(true))
.onErrorMap(throwable -> {
actor.rollback(false);
return throwable;
})
.then(actor.onPreActorMethodInternal(context))
.then((Mono<Object>) func.apply(actor))
.switchIfEmpty(
actor.onPostActorMethodInternal(context))
.flatMap(r -> actor.onPostActorMethodInternal(context).thenReturn(r))
.onErrorMap(throwable -> {
actor.rollback();
return throwable;
})
.map(o -> (T) o);
} catch (Exception e) {
return Mono.error(e);

View File

@ -105,7 +105,7 @@ public class DaprGrpcClientTest {
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
client = new DaprGrpcClient(DaprGrpc.newStub(channel), null);
}
@Test

View File

@ -11,4 +11,7 @@ spec:
- name: databaseName
value: local
- name: collectionName
value: testCollection
value: testCollection
scopes:
- grpcstateclientit
- httpstateclientit

View File

@ -146,6 +146,11 @@
<version>1.3.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
<artifactId>toxiproxy-java</artifactId>
<version>2.1.7</version>
</dependency>
</dependencies>
<build>

View File

@ -15,6 +15,7 @@ package io.dapr.it;
import io.dapr.actors.client.ActorClient;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.resiliency.ResiliencyOptions;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.AfterClass;
@ -194,8 +195,12 @@ public abstract class BaseIT {
}
}
protected ActorClient newActorClient() {
ActorClient client = new ActorClient();
protected static ActorClient newActorClient() {
return newActorClient(null);
}
protected static ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
ActorClient client = new ActorClient(resiliencyOptions);
TO_BE_CLOSED.add(client);
return client;
}

View File

@ -83,7 +83,25 @@ public class Command {
}
});
final Thread stderrReader = new Thread(() -> {
try {
try (InputStream stderr = this.process.getErrorStream()) {
try (InputStreamReader isr = new InputStreamReader(stderr)) {
try (BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
System.err.println(line);
}
}
}
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
stdoutReader.start();
stderrReader.start();
// Waits for success to happen within 1 minute.
finished.tryAcquire(SUCCESS_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!success.get()) {

View File

@ -13,6 +13,8 @@ limitations under the License.
package io.dapr.it;
import io.dapr.config.Properties;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
@ -46,6 +48,20 @@ public class DaprPorts {
}
}
public void use() {
if (this.httpPort != null) {
System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.httpPort));
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.httpPort);
}
if (this.grpcPort != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort));
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort);
}
}
public Integer getGrpcPort() {
return grpcPort;
}

View File

@ -30,7 +30,7 @@ public class DaprRun implements Stoppable {
private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " +
"--config ./configurations/configuration.yaml " +
"--components-path ./components";
"--resources-path ./components";
// the arg in -Dexec.args is the app's port
private static final String DAPR_COMMAND =
@ -130,20 +130,11 @@ public class DaprRun implements Stoppable {
}
public void use() {
if (this.ports.getHttpPort() != null) {
System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.ports.getHttpPort()));
}
if (this.ports.getGrpcPort() != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort()));
}
this.ports.use();
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort());
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort());
}
public void switchToGRPC() {
@ -165,15 +156,15 @@ public class DaprRun implements Stoppable {
System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name());
}
public int getGrpcPort() {
public Integer getGrpcPort() {
return ports.getGrpcPort();
}
public int getHttpPort() {
public Integer getHttpPort() {
return ports.getHttpPort();
}
public int getAppPort() {
public Integer getAppPort() {
return ports.getAppPort();
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.time.Duration;
public class ToxiProxyRun implements Stoppable {
private final DaprRun daprRun;
private final Duration latency;
private final Duration jitter;
private Command toxiProxyServer;
private ToxiproxyClient toxiproxyClient;
private Proxy grpcProxy;
private Proxy httpProxy;
private DaprPorts toxiProxyPorts;
public ToxiProxyRun(DaprRun run, Duration latency, Duration jitter) {
this.daprRun = run;
this.latency = latency;
this.jitter = jitter;
this.toxiProxyPorts = DaprPorts.build(true, true, true);
// artursouza: we use the "appPort" for the ToxiProxy server.
this.toxiProxyServer = new Command(
"Starting HTTP server on endpoint",
"toxiproxy-server --port "
+ this.toxiProxyPorts.getAppPort());
}
public void start() throws IOException, InterruptedException {
this.toxiProxyServer.run();
this.toxiproxyClient = new ToxiproxyClient("127.0.0.1", this.toxiProxyPorts.getAppPort());
if (this.daprRun.getGrpcPort() != null) {
this.grpcProxy = toxiproxyClient.createProxy(
"daprd_grpc",
"127.0.0.1:" + this.toxiProxyPorts.getGrpcPort(),
"127.0.0.1:" + this.daprRun.getGrpcPort());
this.grpcProxy.toxics()
.latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis())
.setJitter(this.jitter.toMillis());
}
if (this.daprRun.getHttpPort() != null) {
this.httpProxy = toxiproxyClient.createProxy(
"daprd_http",
"127.0.0.1:" + this.toxiProxyPorts.getHttpPort(),
"127.0.0.1:" + this.daprRun.getHttpPort());
this.httpProxy.toxics()
.latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis())
.setJitter(this.jitter.toMillis());
}
}
public void use() {
this.toxiProxyPorts.use();
}
@Override
public void stop() throws InterruptedException, IOException {
this.toxiProxyServer.stop();
this.toxiproxyClient = null;
this.grpcProxy = null;
this.httpProxy = null;
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.actors;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.ToxiProxyRun;
import io.dapr.it.actors.services.springboot.DemoActor;
import io.dapr.it.actors.services.springboot.DemoActorService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test SDK resiliency.
*/
public class ActorSdkResiliencytIT extends BaseIT {
private static final ActorId ACTOR_ID = new ActorId(UUID.randomUUID().toString());
private static final int NUM_ITERATIONS = 20;
private static final Duration TIMEOUT = Duration.ofMillis(1000);
private static final Duration LATENCY = TIMEOUT.dividedBy(2);
private static final Duration JITTER = TIMEOUT.multipliedBy(2);
private static final int MAX_RETRIES = -1; // Infinity
private static DaprRun daprRun;
private static DaprClient daprClient;
private static DemoActor demoActor;
private static ToxiProxyRun toxiProxyRun;
private static DemoActor toxiDemoActor;
private static DemoActor resilientDemoActor;
private static DemoActor oneRetryDemoActor;
@BeforeClass
public static void init() throws Exception {
daprRun = startDaprApp(
ActorSdkResiliencytIT.class.getSimpleName(),
DemoActorService.SUCCESS_MESSAGE,
DemoActorService.class,
true,
60000);
ActorId actorId = new ActorId(UUID.randomUUID().toString());
// HTTP client is deprecated, so SDK resiliency is for gRPC client only.
daprRun.switchToGRPC();
demoActor = buildDemoActorProxy(null);
daprClient = new DaprClientBuilder().build();
toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER);
toxiProxyRun.start();
toxiProxyRun.use();
toxiDemoActor = buildDemoActorProxy(new ResiliencyOptions().setTimeout(TIMEOUT));
resilientDemoActor = buildDemoActorProxy(
new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(MAX_RETRIES));
oneRetryDemoActor = buildDemoActorProxy(
new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(1));
}
private static DemoActor buildDemoActorProxy(ResiliencyOptions resiliencyOptions) {
ActorProxyBuilder<DemoActor> builder =
new ActorProxyBuilder(DemoActor.class, newActorClient(resiliencyOptions));
return builder.build(ACTOR_ID);
}
@AfterClass
public static void tearDown() throws Exception {
if (toxiProxyRun != null) {
toxiProxyRun.stop();
}
}
@Test
@Ignore("Flaky when running on GitHub actions")
public void retryAndTimeout() {
AtomicInteger toxiClientErrorCount = new AtomicInteger();
AtomicInteger retryOneClientErrorCount = new AtomicInteger();
String message = "hello world";
for (int i = 0; i < NUM_ITERATIONS; i++) {
try {
toxiDemoActor.writeMessage(message);
} catch (Exception e) {
// This call should fail sometimes. So, we count.
toxiClientErrorCount.incrementAndGet();
}
try {
oneRetryDemoActor.writeMessage(message);
} catch (Exception e) {
// This call should fail sometimes. So, we count.
retryOneClientErrorCount.incrementAndGet();
}
// We retry forever so that the call below should always work.
resilientDemoActor.writeMessage(message);
// Makes sure the value was actually saved.
String savedValue = demoActor.readMessage();
assertEquals(message, savedValue);
}
// This assertion makes sure that toxicity is on
assertTrue(toxiClientErrorCount.get() > 0);
assertTrue(retryOneClientErrorCount.get() > 0);
// A client without retries should have more errors than a client with one retry.
assertTrue(toxiClientErrorCount.get() > retryOneClientErrorCount.get());
}
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.resiliency;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprClientGrpc;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.ToxiProxyRun;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test SDK resiliency.
*/
public class SdkResiliencytIT extends BaseIT {
private static final int NUM_ITERATIONS = 20;
private static final Duration TIMEOUT = Duration.ofMillis(100);
private static final Duration LATENCY = TIMEOUT.dividedBy(2);
private static final Duration JITTER = TIMEOUT.multipliedBy(2);
private static final int MAX_RETRIES = -1; // Infinity
private static DaprRun daprRun;
private static DaprClient daprClient;
private static ToxiProxyRun toxiProxyRun;
private static DaprClient daprToxiClient;
private static DaprClient daprResilientClient;
private static DaprClient daprRetriesOnceClient;
private final String randomStateKeyPrefix = UUID.randomUUID().toString();
@BeforeClass
public static void init() throws Exception {
daprRun = startDaprApp(SdkResiliencytIT.class.getSimpleName(), 5000);
// HTTP client is deprecated, so SDK resiliency is for gRPC client only.
daprRun.switchToGRPC();
daprClient = new DaprClientBuilder().build();
toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER);
toxiProxyRun.start();
toxiProxyRun.use();
daprToxiClient = new DaprClientBuilder()
.withResiliencyOptions(
new ResiliencyOptions().setTimeout(TIMEOUT))
.build();
daprResilientClient = new DaprClientBuilder()
.withResiliencyOptions(
new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(MAX_RETRIES))
.build();
daprRetriesOnceClient = new DaprClientBuilder()
.withResiliencyOptions(
new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(1))
.build();
assertTrue(daprClient instanceof DaprClientGrpc);
assertTrue(daprToxiClient instanceof DaprClientGrpc);
assertTrue(daprResilientClient instanceof DaprClientGrpc);
assertTrue(daprRetriesOnceClient instanceof DaprClientGrpc);
}
@AfterClass
public static void tearDown() throws Exception {
if (daprClient != null) {
daprClient.close();
}
if (daprToxiClient != null) {
daprToxiClient.close();
}
if (daprResilientClient != null) {
daprResilientClient.close();
}
if (daprRetriesOnceClient != null) {
daprRetriesOnceClient.close();
}
if (toxiProxyRun != null) {
toxiProxyRun.stop();
}
}
@Test
public void retryAndTimeout() {
AtomicInteger toxiClientErrorCount = new AtomicInteger();
AtomicInteger retryOneClientErrorCount = new AtomicInteger();
for (int i = 0; i < NUM_ITERATIONS; i++) {
String key = randomStateKeyPrefix + "_" + i;
String value = Base64.getEncoder().encodeToString(key.getBytes(StandardCharsets.UTF_8));
try {
daprToxiClient.saveState(STATE_STORE_NAME, key, value).block();
} catch (Exception e) {
// This call should fail sometimes. So, we count.
toxiClientErrorCount.incrementAndGet();
}
try {
daprRetriesOnceClient.saveState(STATE_STORE_NAME, key, value).block();
} catch (Exception e) {
// This call should fail sometimes. So, we count.
retryOneClientErrorCount.incrementAndGet();
}
// We retry forever so that the call below should always work.
daprResilientClient.saveState(STATE_STORE_NAME, key, value).block();
// Makes sure the value was actually saved.
String savedValue = daprClient.getState(STATE_STORE_NAME, key, String.class).block().getValue();
assertEquals(value, savedValue);
}
// This assertion makes sure that toxicity is on
assertTrue(toxiClientErrorCount.get() > 0);
assertTrue(retryOneClientErrorCount.get() > 0);
// A client without retries should have more errors than a client with one retry.
assertTrue(toxiClientErrorCount.get() > retryOneClientErrorCount.get());
}
}

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.client;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
@ -55,6 +56,11 @@ public class DaprClientBuilder {
*/
private DaprObjectSerializer stateSerializer;
/**
* Resiliency configuration for DaprClient.
*/
private ResiliencyOptions resiliencyOptions;
/**
* Creates a constructor for DaprClient.
*
@ -105,6 +111,17 @@ public class DaprClientBuilder {
return this;
}
/**
* Sets the resiliency options for DaprClient.
*
* @param options Serializer for objects to be persisted.
* @return This instance.
*/
public DaprClientBuilder withResiliencyOptions(ResiliencyOptions options) {
this.resiliencyOptions = options;
return this;
}
/**
* Build an instance of the Client based on the provided setup.
*
@ -162,7 +179,12 @@ public class DaprClientBuilder {
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel();
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer);
return new DaprClientGrpc(
channelFacade,
asyncStub,
this.objectSerializer,
this.stateSerializer,
this.resiliencyOptions);
}
/**

View File

@ -44,9 +44,12 @@ import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
@ -92,18 +95,28 @@ public class DaprClientGrpc extends AbstractDaprClient {
*/
private final GrpcChannelFacade channel;
/**
* The timeout policy.
*/
private final TimeoutPolicy timeoutPolicy;
/**
* The retry policy.
*/
private final RetryPolicy retryPolicy;
/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub asyncStub;
private final DaprGrpc.DaprStub asyncStub;
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @see DaprClientBuilder
*/
DaprClientGrpc(
@ -111,9 +124,32 @@ public class DaprClientGrpc extends AbstractDaprClient {
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
this(channel, asyncStub, objectSerializer, stateSerializer, null);
}
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @param resiliencyOptions Client-level override for resiliency options.
* @see DaprClientBuilder
*/
DaprClientGrpc(
GrpcChannelFacade channel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer,
ResiliencyOptions resiliencyOptions) {
super(objectSerializer, stateSerializer);
this.channel = channel;
this.asyncStub = intercept(asyncStub);
this.timeoutPolicy = new TimeoutPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
}
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
@ -994,14 +1030,14 @@ public class DaprClientGrpc extends AbstractDaprClient {
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
@ -1009,7 +1045,6 @@ public class DaprClientGrpc extends AbstractDaprClient {
if (daprApiToken != null) {
metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
}
super.start(responseListener, metadata);
}
};
@ -1030,11 +1065,13 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}
private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {

View File

@ -0,0 +1,44 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.resiliency;
import java.time.Duration;
/**
* Resiliency policy for SDK communication to Dapr API.
*/
public final class ResiliencyOptions {
private Duration timeout;
private Integer maxRetries;
public Duration getTimeout() {
return timeout;
}
public ResiliencyOptions setTimeout(Duration timeout) {
this.timeout = timeout;
return this;
}
public Integer getMaxRetries() {
return maxRetries;
}
public ResiliencyOptions setMaxRetries(Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.config;
import java.time.Duration;
/**
* Integer configuration property.
*/
public class MillisecondsDurationProperty extends Property<Duration> {
/**
* {@inheritDoc}
*/
MillisecondsDurationProperty(String name, String envName, Duration defaultValue) {
super(name, envName, defaultValue);
}
/**
* {@inheritDoc}
*/
@Override
protected Duration parse(String value) {
long longValue = Long.parseLong(value);
return Duration.ofMillis(longValue);
}
}

View File

@ -17,6 +17,7 @@ import io.dapr.client.DaprApiProtocol;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
/**
* Global properties for Dapr's SDK, using Supplier so they are dynamically resolved.
@ -38,6 +39,16 @@ public class Properties {
*/
private static final Integer DEFAULT_GRPC_PORT = 50001;
/**
* Dapr's default max retries.
*/
private static final Integer DEFAULT_API_MAX_RETRIES = 0;
/**
* Dapr's default timeout in seconds.
*/
private static final Duration DEFAULT_API_TIMEOUT = Duration.ofMillis(0L);
/**
* Dapr's default use of gRPC or HTTP.
*/
@ -115,6 +126,22 @@ public class Properties {
"DAPR_HTTP_ENDPOINT",
null);
/**
* Maximum number of retries for retriable exceptions.
*/
public static final Property<Integer> MAX_RETRIES = new IntegerProperty(
"dapr.api.maxRetries",
"DAPR_API_MAX_RETRIES",
DEFAULT_API_MAX_RETRIES);
/**
* Timeout for API calls.
*/
public static final Property<Duration> TIMEOUT = new MillisecondsDurationProperty(
"dapr.api.timeoutMilliseconds",
"DAPR_API_TIMEOUT_MILLISECONDS",
DEFAULT_API_TIMEOUT);
/**
* Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car.
* @deprecated This attribute will be deleted at SDK version 1.10.

View File

@ -0,0 +1,127 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.internal.resiliency;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
/**
* Retry policy for SDK communication to Dapr API.
*/
public final class RetryPolicy {
private static final int MIN_BACKOFF_MILLIS = 500;
private static final int MAX_BACKOFF_SECONDS = 5;
private final Retry retrySpec;
public RetryPolicy() {
this(null);
}
public RetryPolicy(Integer maxRetries) {
this.retrySpec = buildRetrySpec(maxRetries != null ? maxRetries : Properties.MAX_RETRIES.get());
}
/**
* Applies the retry policy to an expected Mono action.
* @param response Response
* @param <T> Type expected for the action's response
* @return action with retry
*/
public <T> Mono<T> apply(Mono<T> response) {
if (this.retrySpec == null) {
return response;
}
return response.retryWhen(retrySpec)
.onErrorMap(throwable -> findDaprException(throwable));
}
/**
* Applies the retry policy to an expected Flux action.
* @param response Response
* @param <T> Type expected for the action's response
* @return action with retry
*/
public <T> Flux<T> apply(Flux<T> response) {
if (this.retrySpec == null) {
return response;
}
return response.retryWhen(retrySpec)
.onErrorMap(throwable -> findDaprException(throwable));
}
private static Retry buildRetrySpec(int maxRetries) {
if (maxRetries == 0) {
return null;
}
if (maxRetries < 0) {
return Retry.indefinitely()
.filter(throwable -> isRetryableGrpcError(throwable));
}
return Retry.backoff(maxRetries, Duration.ofMillis(MIN_BACKOFF_MILLIS))
.maxBackoff(Duration.ofSeconds(MAX_BACKOFF_SECONDS))
.filter(throwable -> isRetryableGrpcError(throwable));
}
private static boolean isRetryableGrpcError(Throwable throwable) {
Status grpcStatus = findGrpcStatusCode(throwable);
if (grpcStatus == null) {
return false;
}
switch (grpcStatus.getCode()) {
case DEADLINE_EXCEEDED:
case UNAVAILABLE:
return true;
default:
return false;
}
}
private static Status findGrpcStatusCode(Throwable throwable) {
while (throwable != null) {
if (throwable instanceof StatusRuntimeException) {
return ((StatusRuntimeException) throwable).getStatus();
}
throwable = throwable.getCause();
}
return null;
}
private static Throwable findDaprException(Throwable throwable) {
Throwable original = throwable;
while (throwable != null) {
if (throwable instanceof DaprException) {
return throwable;
}
throwable = throwable.getCause();
}
return original;
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.internal.resiliency;
import io.dapr.config.Properties;
import io.grpc.CallOptions;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
* Timeout policy for SDK communication to Dapr API.
*/
public final class TimeoutPolicy {
private final Duration timeout;
/**
* Instantiates a new timeout policy with override value.
* @param timeout Override timeout value.
*/
public TimeoutPolicy(Duration timeout) {
this.timeout = timeout != null ? timeout : Properties.TIMEOUT.get();
}
/**
* Instantiates a new timeout policy with default value.
*/
public TimeoutPolicy() {
this(null);
}
/**
* Applies the timeout policy to a gRPC call options.
* @param options Call options
* @return Call options with retry policy applied
*/
public CallOptions apply(CallOptions options) {
if (this.timeout.isZero() || this.timeout.isNegative()) {
return options;
}
return options.withDeadlineAfter(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
}
}

View File

@ -27,6 +27,8 @@ import java.net.URI;
*/
public final class NetworkUtils {
private static final long RETRY_WAIT_MILLISECONDS = 1000;
private NetworkUtils() {
}
@ -39,7 +41,7 @@ public final class NetworkUtils {
*/
public static void waitForSocket(String host, int port, int timeoutInMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
Retry.callWithRetry(() -> {
callWithRetry(() -> {
try {
try (Socket socket = new Socket()) {
// timeout cannot be negative.
@ -78,4 +80,31 @@ public final class NetworkUtils {
}
return builder.build();
}
private static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
while (true) {
Throwable exception;
try {
function.run();
return;
} catch (Exception e) {
exception = e;
} catch (AssertionError e) {
exception = e;
}
long elapsed = System.currentTimeMillis() - started;
if (elapsed >= retryTimeoutMilliseconds) {
if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
}
throw new RuntimeException(exception);
}
long remaining = retryTimeoutMilliseconds - elapsed;
Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS));
}
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.utils;
class Retry {
private static final long RETRY_WAIT_MILLISECONDS = 1000;
private Retry() {
}
static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
while (true) {
Throwable exception;
try {
function.run();
return;
} catch (Exception e) {
exception = e;
} catch (AssertionError e) {
exception = e;
}
long elapsed = System.currentTimeMillis() - started;
if (elapsed >= retryTimeoutMilliseconds) {
if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
}
throw new RuntimeException(exception);
}
long remaining = retryTimeoutMilliseconds - elapsed;
Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS));
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.resiliency;
import io.dapr.internal.resiliency.RetryPolicy;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.junit.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
public class RetryPolicyTest {
private static final String SUCCESS_MESSAGE = "It worked!";
private static final RuntimeException RETRYABLE_EXCEPTION =
new StatusRuntimeException(Status.DEADLINE_EXCEEDED);
@Test
public void zeroRetriesThenError() {
AtomicInteger callCounter = new AtomicInteger();
RetryPolicy policy = new RetryPolicy(0);
Mono<String> action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION);
try {
policy.apply(action).block();
fail("Exception expected");
} catch (Exception e) {
assertSame(RETRYABLE_EXCEPTION, e);
}
assertEquals(1, callCounter.get());
}
@Test
public void zeroRetriesThenSuccess() {
AtomicInteger callCounter = new AtomicInteger();
RetryPolicy policy = new RetryPolicy(0);
Mono<String> action = createActionErrorAndReturn(callCounter, 0, RETRYABLE_EXCEPTION);
String response = policy.apply(action).block();
assertEquals(SUCCESS_MESSAGE, response);
assertEquals(1, callCounter.get());
}
@Test
public void twoRetriesThenSuccess() {
AtomicInteger callCounter = new AtomicInteger();
RetryPolicy policy = new RetryPolicy(3);
Mono<String> action = createActionErrorAndReturn(callCounter, 2, RETRYABLE_EXCEPTION);
String response = policy.apply(action).block();
assertEquals(SUCCESS_MESSAGE, response);
assertEquals(3, callCounter.get());
}
@Test
public void threeRetriesThenError() {
AtomicInteger callCounter = new AtomicInteger();
RetryPolicy policy = new RetryPolicy(3);
Mono<String> action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION);
try {
policy.apply(action).block();
fail("Exception expected");
} catch (Exception e) {
assertTrue(Exceptions.isRetryExhausted(e));
}
assertEquals(4, callCounter.get());
}
@Test
public void notRetryableException() {
AtomicInteger callCounter = new AtomicInteger();
RuntimeException exception = new ArithmeticException();
RetryPolicy policy = new RetryPolicy(3);
Mono<String> action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, exception);
assertThrows(ArithmeticException.class, () -> {
policy.apply(action).block();
});
assertEquals(1, callCounter.get());
}
private static Mono<String> createActionErrorAndReturn(
AtomicInteger callCounter,
int firstErrors,
RuntimeException error) {
return Mono.fromCallable(() -> {
if (callCounter.incrementAndGet() <= firstErrors) {
throw error;
}
return SUCCESS_MESSAGE;
});
}
}