diff --git a/src/main/java/spiffe/api/svid/SpiffeWorkloadStub.java b/src/main/java/spiffe/api/svid/SpiffeWorkloadStub.java index 758e029..d615d31 100644 --- a/src/main/java/spiffe/api/svid/SpiffeWorkloadStub.java +++ b/src/main/java/spiffe/api/svid/SpiffeWorkloadStub.java @@ -26,7 +26,6 @@ class SpiffeWorkloadStub { workloadAPIAsyncStub= SpiffeWorkloadAPIGrpc .newStub(managedChannel) - .withWaitForReady() .withInterceptors(new SecurityHeaderInterceptor()); } @@ -35,7 +34,7 @@ class SpiffeWorkloadStub { * As no 'spiffeEndpointAddress' is provided, the channel builder will resolve it through the Environment */ SpiffeWorkloadStub() { - this(""); + this(null); } /** diff --git a/src/main/java/spiffe/api/svid/WorkloadAPIClient.java b/src/main/java/spiffe/api/svid/WorkloadAPIClient.java index a7a602d..f94cfc4 100644 --- a/src/main/java/spiffe/api/svid/WorkloadAPIClient.java +++ b/src/main/java/spiffe/api/svid/WorkloadAPIClient.java @@ -1,13 +1,17 @@ package spiffe.api.svid; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; -import static org.apache.commons.lang3.StringUtils.EMPTY; import static spiffe.api.svid.Workload.*; /** @@ -20,6 +24,9 @@ public final class WorkloadAPIClient { private SpiffeWorkloadStub spiffeWorkloadStub; + private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + private RetryState retryState = new RetryState(1, 60, TimeUnit.SECONDS); + /** * Constructor * @param spiffeEndpointAddress @@ -30,18 +37,15 @@ public final class WorkloadAPIClient { /** * Default constructor - * The WorkloadAPI Address will be resolved by an Environment Variable * */ public WorkloadAPIClient() { - spiffeWorkloadStub = new SpiffeWorkloadStub(EMPTY); + spiffeWorkloadStub = new SpiffeWorkloadStub(); } /** * Fetch the SVIDs from the Workload API on a asynchronous fashion * - * TODO: Use a Exponential Backoff to handle the errors and retries - * */ public void fetchX509SVIDs(Consumer> listener) { @@ -53,6 +57,11 @@ public final class WorkloadAPIClient { @Override public void onError(Throwable t) { + LOGGER.error(t.getMessage()); + if (isRetryableError(t)) { + scheduledExecutorService.schedule( + () -> fetchX509SVIDs(listener), retryState.delay(), retryState.timeUnit); + } } @Override @@ -60,10 +69,46 @@ public final class WorkloadAPIClient { } }; + LOGGER.info("Calling fetchX509SVIDs"); spiffeWorkloadStub.fetchX509SVIDs(newRequest(), observer); } + /** + * Checks that the error is retryable. The only error that is not retryable is 'InvalidArgument', + * that occurs when the security header is not present + * @param t + * @return + */ + private boolean isRetryableError(Throwable t) { + return !"InvalidArgument".equalsIgnoreCase(Status.fromThrowable(t).getCode().name()); + } + private X509SVIDRequest newRequest() { return X509SVIDRequest.newBuilder().build(); } + + static class RetryState { + RetryState(long delay, long maxDelay, TimeUnit timeUnit) { + if (delay < 1) { + this.delay = 1; + } else { + this.delay = delay; + } + this.maxDelay = maxDelay; + this.timeUnit = timeUnit; + } + + private long delay; + private long maxDelay; + private TimeUnit timeUnit; + private Function calculateDelay = (d) -> d * 2; + + long delay() { + delay = calculateDelay.apply(delay); + if (delay > maxDelay) { + delay = maxDelay; + } + return delay; + } + } } diff --git a/src/main/java/spiffe/api/svid/util/ExponentialBackOff.java b/src/main/java/spiffe/api/svid/util/ExponentialBackOff.java deleted file mode 100644 index db751df..0000000 --- a/src/main/java/spiffe/api/svid/util/ExponentialBackOff.java +++ /dev/null @@ -1,75 +0,0 @@ -package spiffe.api.svid.util; - -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.function.Supplier; - -/** - * Implements a basic Exponential Backoff Policy - * Retries based on a List of Errors - * - */ -public class ExponentialBackOff { - - private static Logger LOGGER = LoggerFactory.getLogger(ExponentialBackOff.class); - - static int MAX_RETRIES = 5; - static int BASE = 2; - - /** - * The list of Errors that can produce the WorkloadApi and must cause a Retry - */ - private static List RETRYABLE_ERRORS = Arrays.asList("UNAVAILABLE", "PERMISSIONDENIED"); - - private ExponentialBackOff() { - } - - /** - * Execute a Function given as parameter - * @param fn The Function to execute - * @param - * @return - */ - public static T execute(Supplier fn) { - for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) { - try { - LOGGER.info("Attempt no. " + attempt); - return fn.get(); - } catch (StatusRuntimeException e) { - handleError(e, attempt); - } - } - throw new RuntimeException("Failed to communicate with the Workload API"); - } - - private static void handleError(StatusRuntimeException e, int attempt) { - LOGGER.error("Error " + e.getMessage()); - if (isRetryableError(e)) { - sleep(backoffSequenceGenerator(attempt)); - } else { - throw new RuntimeException("Not retryable error occurred. ", e); - } - } - - private static boolean isRetryableError(StatusRuntimeException e) { - return RETRYABLE_ERRORS.contains(e.getStatus().getCode().name()); - } - - private static void sleep(long millis) { - try { - LOGGER.info("Sleeping for " + millis + "ms"); - Thread.sleep(millis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private static long backoffSequenceGenerator(int attempt) { - return (long) Math.pow(BASE, attempt) * 1000; - } -}