Refactoring the retries handling

This commit is contained in:
Max Lambrecht 2018-06-01 11:44:44 -03:00
parent 7540427fc2
commit 218b8dc079
3 changed files with 125 additions and 32 deletions

View File

@ -4,13 +4,12 @@ import io.grpc.Status;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import spiffe.api.svid.retry.RetryHandler;
import spiffe.api.svid.retry.RetryPolicy;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import static spiffe.api.svid.Workload.*; import static spiffe.api.svid.Workload.*;
@ -24,8 +23,8 @@ public final class WorkloadAPIClient {
private SpiffeWorkloadStub spiffeWorkloadStub; private SpiffeWorkloadStub spiffeWorkloadStub;
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); private RetryHandler retryHandler;
private RetryState retryState = new RetryState(1, 60, TimeUnit.SECONDS);
/** /**
* Constructor * Constructor
@ -33,6 +32,16 @@ public final class WorkloadAPIClient {
*/ */
public WorkloadAPIClient(String spiffeEndpointAddress) { public WorkloadAPIClient(String spiffeEndpointAddress) {
spiffeWorkloadStub = new SpiffeWorkloadStub(spiffeEndpointAddress); spiffeWorkloadStub = new SpiffeWorkloadStub(spiffeEndpointAddress);
retryHandler = new RetryHandler(new RetryPolicy(1, 60, TimeUnit.SECONDS));
}
/**
* Constructor
* @param spiffeEndpointAddress
*/
public WorkloadAPIClient(String spiffeEndpointAddress, RetryPolicy retryPolicy) {
spiffeWorkloadStub = new SpiffeWorkloadStub(spiffeEndpointAddress);
retryHandler = new RetryHandler(retryPolicy);
} }
/** /**
@ -40,7 +49,8 @@ public final class WorkloadAPIClient {
* *
*/ */
public WorkloadAPIClient() { public WorkloadAPIClient() {
spiffeWorkloadStub = new SpiffeWorkloadStub(); this(null);
} }
/** /**
@ -59,8 +69,7 @@ public final class WorkloadAPIClient {
public void onError(Throwable t) { public void onError(Throwable t) {
LOGGER.error(t.getMessage()); LOGGER.error(t.getMessage());
if (isRetryableError(t)) { if (isRetryableError(t)) {
scheduledExecutorService.schedule( retryHandler.scheduleRetry(() -> fetchX509SVIDs(listener));
() -> fetchX509SVIDs(listener), retryState.delay(), retryState.timeUnit);
} }
} }
@ -87,28 +96,4 @@ public final class WorkloadAPIClient {
return X509SVIDRequest.newBuilder().build(); return X509SVIDRequest.newBuilder().build();
} }
static class RetryState {
RetryState(long delay, long maxDelay, TimeUnit timeUnit) {
if (delay < 1) {
this.delay = 1;
} else {
this.delay = delay;
}
this.maxDelay = maxDelay;
this.timeUnit = timeUnit;
}
private long delay;
private long maxDelay;
private TimeUnit timeUnit;
private Function<Long, Long> calculateDelay = (d) -> d * 2;
long delay() {
delay = calculateDelay.apply(delay);
if (delay > maxDelay) {
delay = maxDelay;
}
return delay;
}
}
} }

View File

@ -0,0 +1,36 @@
package spiffe.api.svid.retry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* Handle the retries and backoffs.
*/
public class RetryHandler {
private long nextDelay;
private RetryPolicy retryPolicy;
private ScheduledExecutorService scheduledExecutorService;
/**
* Constructor
* @param retryPolicy
*/
public RetryHandler(RetryPolicy retryPolicy) {
this.nextDelay = retryPolicy.initialDelay();
this.retryPolicy = retryPolicy;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
/**
* Schedule to execture a Runnable, based on the retry and backoff policy
* Updates the next delay
* @param callable
*/
public void scheduleRetry(Runnable callable) {
scheduledExecutorService.schedule(callable, nextDelay, retryPolicy.timeUnit());
nextDelay = retryPolicy.nextDelay(nextDelay);
}
}

View File

@ -0,0 +1,72 @@
package spiffe.api.svid.retry;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* Configuration Parameters for the Retry behavior
* Allow configure initialDelay, maxDelay, timeUnit
* and the Function to calculate the delays
*/
public class RetryPolicy {
private long initialDelay;
private long maxDelay;
private TimeUnit timeUnit;
private Function<Long, Long> backoffFunction;
/**
* Constructor
*
* Sets default backoff function to multiply by 2
* @param initialDelay
* @param maxDelay
* @param timeUnit
*/
public RetryPolicy(long initialDelay, long maxDelay, TimeUnit timeUnit) {
if (initialDelay < 1) {
this.initialDelay = 1;
} else {
this.initialDelay = initialDelay;
}
this.maxDelay = maxDelay;
this.timeUnit = timeUnit;
this.backoffFunction = (d) -> d * 2;
}
/**
* Constructor
*
* Allow to configure the backoff function
* @param initialDelay
* @param maxDelay
* @param timeUnit
* @param backoffFunction
*/
public RetryPolicy(long initialDelay, long maxDelay, TimeUnit timeUnit, Function<Long, Long> backoffFunction) {
this.initialDelay = initialDelay;
this.maxDelay = maxDelay;
this.timeUnit = timeUnit;
this.backoffFunction = backoffFunction;
}
public long initialDelay() {
return initialDelay;
}
public TimeUnit timeUnit() {
return timeUnit;
}
/**
* Calculate the nextDelay based on a currentDelay, applying the backoff function
* If the calculated delay is greater than maxDelay, it returns maxDelay
* @param currentDelay
* @return
*/
public long nextDelay(long currentDelay) {
long next = backoffFunction.apply(currentDelay);
return next < maxDelay ? next : maxDelay;
}
}