Adding a basic implementation of the retry when there is an error on the async call

This commit is contained in:
Max Lambrecht 2018-05-31 17:11:24 -03:00
parent 3f4d446529
commit 932bf7876f
3 changed files with 51 additions and 82 deletions

View File

@ -26,7 +26,6 @@ class SpiffeWorkloadStub {
workloadAPIAsyncStub= SpiffeWorkloadAPIGrpc
.newStub(managedChannel)
.withWaitForReady()
.withInterceptors(new SecurityHeaderInterceptor());
}
@ -35,7 +34,7 @@ class SpiffeWorkloadStub {
* As no 'spiffeEndpointAddress' is provided, the channel builder will resolve it through the Environment
*/
SpiffeWorkloadStub() {
this("");
this(null);
}
/**

View File

@ -1,13 +1,17 @@
package spiffe.api.svid;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static spiffe.api.svid.Workload.*;
/**
@ -20,6 +24,9 @@ public final class WorkloadAPIClient {
private SpiffeWorkloadStub spiffeWorkloadStub;
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private RetryState retryState = new RetryState(1, 60, TimeUnit.SECONDS);
/**
* Constructor
* @param spiffeEndpointAddress
@ -30,18 +37,15 @@ public final class WorkloadAPIClient {
/**
* Default constructor
* The WorkloadAPI Address will be resolved by an Environment Variable
*
*/
public WorkloadAPIClient() {
spiffeWorkloadStub = new SpiffeWorkloadStub(EMPTY);
spiffeWorkloadStub = new SpiffeWorkloadStub();
}
/**
* Fetch the SVIDs from the Workload API on a asynchronous fashion
*
* TODO: Use a Exponential Backoff to handle the errors and retries
*
*/
public void fetchX509SVIDs(Consumer<List<X509SVID>> listener) {
@ -53,6 +57,11 @@ public final class WorkloadAPIClient {
@Override
public void onError(Throwable t) {
LOGGER.error(t.getMessage());
if (isRetryableError(t)) {
scheduledExecutorService.schedule(
() -> fetchX509SVIDs(listener), retryState.delay(), retryState.timeUnit);
}
}
@Override
@ -60,10 +69,46 @@ public final class WorkloadAPIClient {
}
};
LOGGER.info("Calling fetchX509SVIDs");
spiffeWorkloadStub.fetchX509SVIDs(newRequest(), observer);
}
/**
* Checks that the error is retryable. The only error that is not retryable is 'InvalidArgument',
* that occurs when the security header is not present
* @param t
* @return
*/
private boolean isRetryableError(Throwable t) {
return !"InvalidArgument".equalsIgnoreCase(Status.fromThrowable(t).getCode().name());
}
private X509SVIDRequest newRequest() {
return X509SVIDRequest.newBuilder().build();
}
static class RetryState {
RetryState(long delay, long maxDelay, TimeUnit timeUnit) {
if (delay < 1) {
this.delay = 1;
} else {
this.delay = delay;
}
this.maxDelay = maxDelay;
this.timeUnit = timeUnit;
}
private long delay;
private long maxDelay;
private TimeUnit timeUnit;
private Function<Long, Long> calculateDelay = (d) -> d * 2;
long delay() {
delay = calculateDelay.apply(delay);
if (delay > maxDelay) {
delay = maxDelay;
}
return delay;
}
}
}

View File

@ -1,75 +0,0 @@
package spiffe.api.svid.util;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
/**
* Implements a basic Exponential Backoff Policy
* Retries based on a List of Errors
*
*/
public class ExponentialBackOff {
private static Logger LOGGER = LoggerFactory.getLogger(ExponentialBackOff.class);
static int MAX_RETRIES = 5;
static int BASE = 2;
/**
* The list of Errors that can produce the WorkloadApi and must cause a Retry
*/
private static List<String> RETRYABLE_ERRORS = Arrays.asList("UNAVAILABLE", "PERMISSIONDENIED");
private ExponentialBackOff() {
}
/**
* Execute a Function given as parameter
* @param fn The Function to execute
* @param <T>
* @return
*/
public static <T> T execute(Supplier<T> fn) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
LOGGER.info("Attempt no. " + attempt);
return fn.get();
} catch (StatusRuntimeException e) {
handleError(e, attempt);
}
}
throw new RuntimeException("Failed to communicate with the Workload API");
}
private static void handleError(StatusRuntimeException e, int attempt) {
LOGGER.error("Error " + e.getMessage());
if (isRetryableError(e)) {
sleep(backoffSequenceGenerator(attempt));
} else {
throw new RuntimeException("Not retryable error occurred. ", e);
}
}
private static boolean isRetryableError(StatusRuntimeException e) {
return RETRYABLE_ERRORS.contains(e.getStatus().getCode().name());
}
private static void sleep(long millis) {
try {
LOGGER.info("Sleeping for " + millis + "ms");
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static long backoffSequenceGenerator(int attempt) {
return (long) Math.pow(BASE, attempt) * 1000;
}
}