Adding backoff retry to watchX509Context.
Changing X509ContextException and X509ContextException to make them checked. Address multiple PR comments. Adding tests to Address and TrustDomain. Signed-off-by: Max Lambrecht <maxlambrecht@gmail.com>
This commit is contained in:
parent
8e64bb63a0
commit
7d12743fb5
|
|
@ -6,4 +6,10 @@ Core functionality to fetch X509 and JWT SVIDs from the Workload API.
|
|||
|
||||
```
|
||||
TBD
|
||||
```
|
||||
```
|
||||
|
||||
## Netty Event Loop thread number configuration
|
||||
|
||||
Use the variable `io.netty.eventLoopThreads` to configure the number of threads for the Netty Event Loop Group.
|
||||
|
||||
By default, it is `availableProcessors * 2`.
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ buildscript {
|
|||
}
|
||||
|
||||
ext {
|
||||
grpcVersion = '1.28.0'
|
||||
nettyVersion = '4.1.33.Final'
|
||||
grpcVersion = '1.29.0'
|
||||
nettyVersion = '4.1.49.Final'
|
||||
protobufProtocVersion = '3.11.4'
|
||||
}
|
||||
|
||||
|
|
@ -49,6 +49,7 @@ dependencies {
|
|||
implementation group: 'io.grpc', name: 'grpc-netty', version: "${grpcVersion}"
|
||||
implementation group: 'io.grpc', name: 'grpc-protobuf', version: "${grpcVersion}"
|
||||
implementation group: 'io.grpc', name: 'grpc-stub', version: "${grpcVersion}"
|
||||
|
||||
implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: "${nettyVersion}", classifier: 'linux-x86_64'
|
||||
implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: "${nettyVersion}", classifier: 'osx-x86_64'
|
||||
compileOnly group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
package spiffe.exception;
|
||||
|
||||
/**
|
||||
* Unchecked exception thrown when a there was an error retrieving
|
||||
* Checked exception thrown when a there was an error retrieving
|
||||
* or processing a X509Context.
|
||||
*/
|
||||
public class X509ContextException extends RuntimeException {
|
||||
public class X509ContextException extends Exception {
|
||||
public X509ContextException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
package spiffe.exception;
|
||||
|
||||
/**
|
||||
* Unchecked thrown when there is an error creating or initializing a X.509 source
|
||||
* Checked thrown when there is an error creating or initializing a X.509 source
|
||||
*/
|
||||
public class X509SourceException extends RuntimeException {
|
||||
public class X509SourceException extends Exception {
|
||||
public X509SourceException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import java.util.stream.Collectors;
|
|||
@Value
|
||||
public class SpiffeId {
|
||||
|
||||
private static final String SPIFFE_SCHEMA = "spiffe";
|
||||
public static final String SPIFFE_SCHEME = "spiffe";
|
||||
|
||||
TrustDomain trustDomain;
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ public class SpiffeId {
|
|||
|
||||
val uri = URI.create(spiffeIdAsString);
|
||||
|
||||
if (!SPIFFE_SCHEMA.equals(uri.getScheme())) {
|
||||
if (!SPIFFE_SCHEME.equals(uri.getScheme())) {
|
||||
throw new IllegalArgumentException("Invalid SPIFFE schema");
|
||||
}
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ public class SpiffeId {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s://%s%s", SPIFFE_SCHEMA, this.trustDomain.toString(), this.path);
|
||||
return String.format("%s://%s%s", SPIFFE_SCHEME, this.trustDomain.toString(), this.path);
|
||||
}
|
||||
|
||||
private static String normalize(String s) {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static spiffe.spiffeid.SpiffeId.SPIFFE_SCHEME;
|
||||
|
||||
/**
|
||||
* A <code>TrustDomain</code> represents a normalized SPIFFE trust domain (e.g. domain.test).
|
||||
|
|
@ -33,15 +33,47 @@ public class TrustDomain {
|
|||
*/
|
||||
public static TrustDomain of(@NonNull String trustDomain) {
|
||||
if (StringUtils.isBlank(trustDomain)) {
|
||||
throw new IllegalArgumentException("Trust Domain cannot be empty");
|
||||
throw new IllegalArgumentException("Trust domain cannot be empty");
|
||||
}
|
||||
|
||||
URI uri;
|
||||
try {
|
||||
val uri = new URI(normalize(trustDomain));
|
||||
val host = getHost(uri);
|
||||
return new TrustDomain(host);
|
||||
val normalized = normalize(trustDomain);
|
||||
uri = new URI(normalized);
|
||||
validateUri(uri);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(format("Unable to parse: %s", trustDomain), e);
|
||||
throw new IllegalArgumentException(e.getMessage());
|
||||
}
|
||||
|
||||
val host = uri.getHost();
|
||||
validateHost(host);
|
||||
return new TrustDomain(host);
|
||||
}
|
||||
|
||||
private static void validateHost(String host) {
|
||||
if (StringUtils.isBlank(host)) {
|
||||
throw new IllegalArgumentException("Trust domain cannot be empty");
|
||||
}
|
||||
}
|
||||
|
||||
private static void validateUri(URI uri) {
|
||||
val scheme = uri.getScheme();
|
||||
if (StringUtils.isNotBlank(scheme) && !SPIFFE_SCHEME.equals(scheme)) {
|
||||
throw new IllegalArgumentException("Invalid scheme");
|
||||
}
|
||||
|
||||
val port = uri.getPort();
|
||||
if (port != -1) {
|
||||
throw new IllegalArgumentException("Port is not allowed");
|
||||
}
|
||||
}
|
||||
|
||||
private static String normalize(String s) {
|
||||
s = s.toLowerCase().trim();
|
||||
if (!s.contains("://")) {
|
||||
s = SPIFFE_SCHEME.concat("://").concat(s);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -53,15 +85,4 @@ public class TrustDomain {
|
|||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
|
||||
private static String normalize(String s) {
|
||||
return s.toLowerCase().trim();
|
||||
}
|
||||
|
||||
private static String getHost(URI uri) {
|
||||
if (StringUtils.isBlank(uri.getHost())) {
|
||||
return uri.getPath();
|
||||
}
|
||||
return uri.getHost();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package spiffe.workloadapi;
|
|||
|
||||
import io.grpc.Context;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
|
@ -22,6 +23,8 @@ import spiffe.workloadapi.internal.SecurityHeaderInterceptor;
|
|||
import spiffe.workloadapi.internal.SpiffeWorkloadAPIGrpc;
|
||||
import spiffe.workloadapi.internal.SpiffeWorkloadAPIGrpc.SpiffeWorkloadAPIBlockingStub;
|
||||
import spiffe.workloadapi.internal.SpiffeWorkloadAPIGrpc.SpiffeWorkloadAPIStub;
|
||||
import spiffe.workloadapi.retry.BackoffPolicy;
|
||||
import spiffe.workloadapi.retry.RetryHandler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.security.cert.CertificateException;
|
||||
|
|
@ -29,34 +32,55 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.logging.Level;
|
||||
|
||||
import static spiffe.workloadapi.internal.Workload.X509SVIDRequest;
|
||||
import static spiffe.workloadapi.internal.Workload.X509SVIDResponse;
|
||||
|
||||
/**
|
||||
* A <code>WorkloadApiClient</code> represents a client to interact with the Workload API.
|
||||
* Supports one-shot calls and watch updates for X.509 and JWT SVIDS and bundles.
|
||||
* <p>
|
||||
* Supports one-shot calls and watch updates for X.509 and JWT SVIDs and bundles.
|
||||
* <p>
|
||||
* The watch for updates methods support retries using an exponential backoff policy to reestablish
|
||||
* the stream connection to the Workload API.
|
||||
*/
|
||||
@Log
|
||||
public class WorkloadApiClient implements Closeable {
|
||||
|
||||
private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT";
|
||||
|
||||
private final SpiffeWorkloadAPIStub workloadApiAsyncStub;
|
||||
private final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub;
|
||||
private final ManagedChannel managedChannel;
|
||||
private final List<Context.CancellableContext> cancellableContexts;
|
||||
private final BackoffPolicy backoffPolicy;
|
||||
|
||||
private WorkloadApiClient(SpiffeWorkloadAPIStub workloadApiAsyncStub, SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, ManagedChannel managedChannel) {
|
||||
// using a scheduled executor service to be able to schedule retries
|
||||
// it is injected in each of the retryHandlers in the watch methods
|
||||
private final ScheduledExecutorService retryExecutor;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
private WorkloadApiClient(SpiffeWorkloadAPIStub workloadApiAsyncStub,
|
||||
SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub,
|
||||
ManagedChannel managedChannel,
|
||||
BackoffPolicy backoffPolicy, ScheduledExecutorService retryExecutor) {
|
||||
this.workloadApiAsyncStub = workloadApiAsyncStub;
|
||||
this.workloadApiBlockingStub = workloadApiBlockingStub;
|
||||
this.managedChannel = managedChannel;
|
||||
this.cancellableContexts = Collections.synchronizedList(new ArrayList<>());
|
||||
this.backoffPolicy = backoffPolicy;
|
||||
this.retryExecutor = retryExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new Workload API client using the default socket endpoint address.
|
||||
* @see Address#getDefaultAddress()
|
||||
*
|
||||
* @return a {@link WorkloadApiClient}
|
||||
* @see Address#getDefaultAddress()
|
||||
*/
|
||||
public static WorkloadApiClient newClient() throws SocketEndpointAddressException {
|
||||
val options = ClientOptions.builder().build();
|
||||
|
|
@ -79,9 +103,12 @@ public class WorkloadApiClient implements Closeable {
|
|||
spiffeSocketPath = Address.getDefaultAddress();
|
||||
}
|
||||
|
||||
if (options.backoffPolicy == null) {
|
||||
options.backoffPolicy = new BackoffPolicy();
|
||||
}
|
||||
|
||||
val socketEndpointAddress = Address.parseAddress(spiffeSocketPath);
|
||||
val managedChannel = GrpcManagedChannelFactory.newChannel(socketEndpointAddress);
|
||||
|
||||
val workloadAPIAsyncStub = SpiffeWorkloadAPIGrpc
|
||||
.newStub(managedChannel)
|
||||
.withInterceptors(new SecurityHeaderInterceptor());
|
||||
|
|
@ -90,7 +117,14 @@ public class WorkloadApiClient implements Closeable {
|
|||
.newBlockingStub(managedChannel)
|
||||
.withInterceptors(new SecurityHeaderInterceptor());
|
||||
|
||||
return new WorkloadApiClient(workloadAPIAsyncStub, workloadAPIBlockingStub, managedChannel);
|
||||
val retryExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
return new WorkloadApiClient(
|
||||
workloadAPIAsyncStub,
|
||||
workloadAPIBlockingStub,
|
||||
managedChannel,
|
||||
options.backoffPolicy,
|
||||
retryExecutor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,18 +132,12 @@ public class WorkloadApiClient implements Closeable {
|
|||
*
|
||||
* @throws X509ContextException if there is an error fetching or processing the X.509 context
|
||||
*/
|
||||
public X509Context fetchX509Context() {
|
||||
Context.CancellableContext cancellableContext;
|
||||
cancellableContext = Context.current().withCancellation();
|
||||
X509Context result;
|
||||
try {
|
||||
result = cancellableContext.call(this::processX509Context);
|
||||
public X509Context fetchX509Context() throws X509ContextException {
|
||||
try (val cancellableContext = Context.current().withCancellation()) {
|
||||
return cancellableContext.call(this::processX509Context);
|
||||
} catch (Exception e) {
|
||||
throw new X509ContextException("Error fetching X509Context", e);
|
||||
}
|
||||
// close connection
|
||||
cancellableContext.close();
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -118,32 +146,49 @@ public class WorkloadApiClient implements Closeable {
|
|||
* @param watcher an instance that implements a {@link Watcher}.
|
||||
*/
|
||||
public void watchX509Context(Watcher<X509Context> watcher) {
|
||||
StreamObserver<X509SVIDResponse> streamObserver = new StreamObserver<X509SVIDResponse>() {
|
||||
val retryHandler = new RetryHandler(backoffPolicy, retryExecutor);
|
||||
val cancellableContext = Context.current().withCancellation();
|
||||
|
||||
val streamObserver = getX509ContextStreamObserver(watcher, retryHandler, cancellableContext);
|
||||
|
||||
cancellableContext.run(() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), streamObserver));
|
||||
this.cancellableContexts.add(cancellableContext);
|
||||
}
|
||||
|
||||
private StreamObserver<X509SVIDResponse> getX509ContextStreamObserver(Watcher<X509Context> watcher, RetryHandler retryHandler, Context.CancellableContext cancellableContext) {
|
||||
return new StreamObserver<X509SVIDResponse>() {
|
||||
@Override
|
||||
public void onNext(X509SVIDResponse value) {
|
||||
X509Context x509Context = null;
|
||||
try {
|
||||
x509Context = GrpcConversionUtils.toX509Context(value);
|
||||
X509Context x509Context = GrpcConversionUtils.toX509Context(value);
|
||||
watcher.onUpdate(x509Context);
|
||||
retryHandler.reset();
|
||||
} catch (CertificateException | X509SvidException e) {
|
||||
watcher.onError(new X509ContextException("Error processing X509 Context update", e));
|
||||
}
|
||||
watcher.onUpdate(x509Context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
watcher.onError(new X509ContextException("Error getting X509Context", t));
|
||||
handleWatchX509ContextError(t);
|
||||
}
|
||||
|
||||
private void handleWatchX509ContextError(Throwable t) {
|
||||
if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) {
|
||||
watcher.onError(new X509ContextException("Canceling X509 Context watch", t));
|
||||
} else {
|
||||
log.log(Level.INFO, "Retrying connecting to Workload API to register X509 context watcher");
|
||||
retryHandler.scheduleRetry(() ->
|
||||
cancellableContext.run(() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), this)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
cancellableContext.close();
|
||||
watcher.onError(new X509ContextException("Unexpected completed stream"));
|
||||
}
|
||||
};
|
||||
Context.CancellableContext cancellableContext;
|
||||
cancellableContext = Context.current().withCancellation();
|
||||
cancellableContext.run(() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), streamObserver));
|
||||
this.cancellableContexts.add(cancellableContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -152,9 +197,7 @@ public class WorkloadApiClient implements Closeable {
|
|||
* @param subject a SPIFFE ID
|
||||
* @param audience the audience of the JWT-SVID
|
||||
* @param extraAudience the extra audience for the JWT_SVID
|
||||
*
|
||||
* @return an instance of a {@link JwtSvid}
|
||||
*
|
||||
* @throws //TODO: declare thrown exceptions
|
||||
*/
|
||||
public JwtSvid fetchJwtSvid(SpiffeId subject, String audience, String... extraAudience) {
|
||||
|
|
@ -178,7 +221,6 @@ public class WorkloadApiClient implements Closeable {
|
|||
* @param token JWT token
|
||||
* @param audience audience of the JWT
|
||||
* @return the {@link JwtSvid} if the token and audience could be validated.
|
||||
*
|
||||
* @throws //TODO: declare thrown exceptions
|
||||
*/
|
||||
public JwtSvid validateJwtSvid(String token, String audience) {
|
||||
|
|
@ -199,42 +241,49 @@ public class WorkloadApiClient implements Closeable {
|
|||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
log.info("Closing WorkloadAPI client");
|
||||
log.log(Level.FINE, "Closing WorkloadAPI client");
|
||||
synchronized (this) {
|
||||
for (val context : cancellableContexts) {
|
||||
context.close();
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
for (val context : cancellableContexts) {
|
||||
context.close();
|
||||
}
|
||||
managedChannel.shutdown();
|
||||
retryExecutor.shutdown();
|
||||
}
|
||||
log.info("Shutting down Managed Channel");
|
||||
managedChannel.shutdown();
|
||||
}
|
||||
log.log(Level.FINE, "WorkloadAPI client is closed");
|
||||
}
|
||||
|
||||
private X509SVIDRequest newX509SvidRequest() {
|
||||
return X509SVIDRequest.newBuilder().build();
|
||||
}
|
||||
|
||||
private X509Context processX509Context() {
|
||||
private X509Context processX509Context() throws X509ContextException {
|
||||
try {
|
||||
Iterator<X509SVIDResponse> x509SVIDResponse = workloadApiBlockingStub.fetchX509SVID(newX509SvidRequest());
|
||||
if (x509SVIDResponse.hasNext()) {
|
||||
return GrpcConversionUtils.toX509Context(x509SVIDResponse.next());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (CertificateException | X509SvidException e) {
|
||||
throw new X509ContextException("Error processing X509Context", e);
|
||||
}
|
||||
throw new X509ContextException("Error processing X509Context: x509SVIDResponse is empty");
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for creating a new {@link WorkloadApiClient}.
|
||||
* Options for creating a new {@link WorkloadApiClient}. The {@link BackoffPolicy} is used
|
||||
* to configure a {@link RetryHandler} to perform retries to reconnect to the Workload API.
|
||||
*/
|
||||
@Data
|
||||
public static class ClientOptions {
|
||||
String spiffeSocketPath;
|
||||
BackoffPolicy backoffPolicy;
|
||||
|
||||
@Builder
|
||||
public ClientOptions(String spiffeSocketPath) {
|
||||
public ClientOptions(String spiffeSocketPath, BackoffPolicy backoffPolicy) {
|
||||
this.spiffeSocketPath = spiffeSocketPath;
|
||||
this.backoffPolicy = backoffPolicy;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import spiffe.bundle.x509bundle.X509BundleSet;
|
|||
import spiffe.bundle.x509bundle.X509BundleSource;
|
||||
import spiffe.exception.BundleNotFoundException;
|
||||
import spiffe.exception.SocketEndpointAddressException;
|
||||
import spiffe.exception.X509ContextException;
|
||||
import spiffe.exception.X509SourceException;
|
||||
import spiffe.spiffeid.TrustDomain;
|
||||
import spiffe.svid.x509svid.X509Svid;
|
||||
|
|
@ -57,7 +58,7 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
|
|||
* @throws SocketEndpointAddressException if the address to the Workload API is not valid
|
||||
* @throws X509SourceException if the source could not be initialized
|
||||
*/
|
||||
public static X509Source newSource() throws SocketEndpointAddressException {
|
||||
public static X509Source newSource() throws SocketEndpointAddressException, X509SourceException {
|
||||
X509SourceOptions x509SourceOptions = X509SourceOptions.builder().build();
|
||||
return newSource(x509SourceOptions);
|
||||
}
|
||||
|
|
@ -75,7 +76,7 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
|
|||
* @throws SocketEndpointAddressException if the address to the Workload API is not valid
|
||||
* @throws X509SourceException if the source could not be initialized
|
||||
*/
|
||||
public static X509Source newSource(@NonNull X509SourceOptions options) throws SocketEndpointAddressException {
|
||||
public static X509Source newSource(@NonNull X509SourceOptions options) throws SocketEndpointAddressException, X509SourceException {
|
||||
if (options.workloadApiClient == null) {
|
||||
options.workloadApiClient = createClient(options);
|
||||
}
|
||||
|
|
@ -149,7 +150,7 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
|
|||
return WorkloadApiClient.newClient(clientOptions);
|
||||
}
|
||||
|
||||
private void init() {
|
||||
private void init() throws X509ContextException {
|
||||
X509Context x509Context = workloadApiClient.fetchX509Context();
|
||||
setX509Context(x509Context);
|
||||
setX509ContextWatcher();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
package spiffe.workloadapi.retry;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.val;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.UnaryOperator;
|
||||
|
||||
@Data
|
||||
public class BackoffPolicy {
|
||||
|
||||
/**
|
||||
* Retry indefinitely, default behavior
|
||||
*/
|
||||
public static final int UNLIMITED_RETRIES = -1;
|
||||
|
||||
private static final int BACKOFF_MULTIPLIER = 2;
|
||||
|
||||
/**
|
||||
* The first backoff delay period
|
||||
*/
|
||||
Duration initialDelay = Duration.ofSeconds(1);
|
||||
|
||||
/**
|
||||
* Max time of delay for the backoff period
|
||||
*/
|
||||
Duration maxDelay = Duration.ofSeconds(60);
|
||||
|
||||
/**
|
||||
* Max number of retries, unlimited by default
|
||||
*/
|
||||
int maxRetries = UNLIMITED_RETRIES;
|
||||
|
||||
/**
|
||||
* Function to calculate the backoff delay
|
||||
*/
|
||||
UnaryOperator<Duration> backoffFunction = d -> d.multipliedBy(BACKOFF_MULTIPLIER);
|
||||
|
||||
/**
|
||||
* Build backoff policy with defaults
|
||||
*/
|
||||
public BackoffPolicy() {
|
||||
}
|
||||
|
||||
@Builder
|
||||
public BackoffPolicy(Duration initialDelay, Duration maxDelay, int maxRetries, UnaryOperator<Duration> backoffFunction) {
|
||||
this.initialDelay = initialDelay;
|
||||
this.maxDelay = maxDelay;
|
||||
this.maxRetries = maxRetries;
|
||||
this.backoffFunction = backoffFunction;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the nextDelay based on a currentDelay, applying the backoff function
|
||||
* If the calculated delay is greater than maxDelay, it returns maxDelay
|
||||
*
|
||||
* @param currentDelay
|
||||
* @return next delay
|
||||
*/
|
||||
public Duration nextDelay(Duration currentDelay) {
|
||||
val next = backoffFunction.apply(currentDelay);
|
||||
if (next.compareTo(maxDelay) > 0) {
|
||||
return maxDelay;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the RetryPolicy is configure with UNLIMITED_RETRIES
|
||||
* or if the retriesCount param is lower than the maxRetries
|
||||
*
|
||||
* @param retriesCount the current number of retries
|
||||
* @return
|
||||
*/
|
||||
public boolean doNotExceedMaxRetries(int retriesCount) {
|
||||
return (maxRetries == UNLIMITED_RETRIES || retriesCount < maxRetries);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
package spiffe.workloadapi.retry;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Provides methods to schedule the execution of retries based on a backoff policy
|
||||
*/
|
||||
public class RetryHandler {
|
||||
|
||||
public final ScheduledExecutorService executor;
|
||||
private final BackoffPolicy backoffPolicy;
|
||||
private Duration nextDelay;
|
||||
private int retryCount;
|
||||
|
||||
public RetryHandler(BackoffPolicy backoffPolicy, ScheduledExecutorService executor) {
|
||||
this.nextDelay = backoffPolicy.getInitialDelay();
|
||||
this.backoffPolicy = backoffPolicy;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule to execute a Runnable, based on the backoff policy
|
||||
* Updates the next delay and retries count
|
||||
*/
|
||||
public void scheduleRetry(Runnable runnable) {
|
||||
if (backoffPolicy.doNotExceedMaxRetries(retryCount)) {
|
||||
executor.schedule(runnable, nextDelay.getSeconds(), TimeUnit.SECONDS);
|
||||
nextDelay = backoffPolicy.nextDelay(nextDelay);
|
||||
retryCount++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset state of RetryHandle to initial values
|
||||
*/
|
||||
public void reset() {
|
||||
nextDelay = backoffPolicy.getInitialDelay();
|
||||
retryCount = 0;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,86 +1,43 @@
|
|||
package spiffe.spiffeid;
|
||||
|
||||
import lombok.val;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
|
||||
|
||||
public class TrustDomainTest {
|
||||
|
||||
@Test
|
||||
void of_givenAString_returnTrustDomain() {
|
||||
val trustDomain = TrustDomain.of("domain.test");
|
||||
assertEquals("domain.test", trustDomain.toString());
|
||||
|
||||
static Stream<Arguments> provideTestTrustDomain() {
|
||||
return Stream.of(
|
||||
Arguments.of("", "Trust domain cannot be empty"),
|
||||
Arguments.of(null, "trustDomain is marked non-null but is null"),
|
||||
Arguments.of(" DomAin.TesT ", "domain.test"),
|
||||
Arguments.of(" spiffe://domaiN.Test ", "domain.test"),
|
||||
Arguments.of("spiffe://domain.test/path/element", "domain.test"),
|
||||
Arguments.of("spiffe://domain.test/spiffe://domain.test/path/element", "domain.test"),
|
||||
Arguments.of("spiffe://domain.test/spiffe://domain.test:80/path/element", "domain.test"),
|
||||
Arguments.of("http://domain.test", "Invalid scheme"),
|
||||
Arguments.of("spiffe:// domain.test ", "Illegal character in authority at index 9: spiffe:// domain.test"),
|
||||
Arguments.of("://domain.test", "Expected scheme name at index 0: ://domain.test"),
|
||||
Arguments.of("spiffe:///path/element", "Trust domain cannot be empty"),
|
||||
Arguments.of("/path/element", "Trust domain cannot be empty"),
|
||||
Arguments.of("spiffe://domain.test:80", "Port is not allowed")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_givenASpiffeIdString_returnTrustDomainWithHostPart() {
|
||||
val trustDomain = TrustDomain.of("spiffe://domain.test");
|
||||
assertEquals("domain.test", trustDomain.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_givenASpiffeIdStringWithPath_returnTrustDomainWithHostPart() {
|
||||
val trustDomain = TrustDomain.of("spiffe://domain.test/workload");
|
||||
assertEquals("domain.test", trustDomain.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_givenAStringWithCaps_returnNormalizedTrustDomain() {
|
||||
val trustDomain = TrustDomain.of("DoMAin.TesT");
|
||||
|
||||
assertEquals("domain.test", trustDomain.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_givenAStringWithTrailingAndLeadingBlanks_returnNormalizedTrustDomain() {
|
||||
val trustDomain = TrustDomain.of(" domain.test ");
|
||||
|
||||
assertEquals("domain.test", trustDomain.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_nullString_ThrowsIllegalArgumentException() {
|
||||
@ParameterizedTest
|
||||
@MethodSource("provideTestTrustDomain")
|
||||
void parseAddressInvalid(String input, Object expected) {
|
||||
TrustDomain result = null;
|
||||
try {
|
||||
TrustDomain.of(null);
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("trustDomain is marked non-null but is null", e.getMessage());
|
||||
result = TrustDomain.of(input);
|
||||
assertEquals(expected, result.getName());
|
||||
} catch (Exception e) {
|
||||
assertEquals(expected, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_emptyString_ThrowsIllegalArgumentException() {
|
||||
try {
|
||||
TrustDomain.of("");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Trust Domain cannot be empty", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void of_blankString_ThrowsIllegalArgumentException() {
|
||||
try {
|
||||
TrustDomain.of(" ");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Trust Domain cannot be empty", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void equals_twoTrustDomainObjectsWithTheSameString_returnsTrue() {
|
||||
val trustDomain1 = TrustDomain.of("example.org");
|
||||
val trustDomain2 = TrustDomain.of("example.org");
|
||||
|
||||
assertEquals(trustDomain1, trustDomain2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void equals_twoTrustDomainObjectsWithDifferentStrings_returnsFalse() {
|
||||
val trustDomain1 = TrustDomain.of("example.org");
|
||||
val trustDomain2 = TrustDomain.of("other.org");
|
||||
|
||||
assertNotEquals(trustDomain1, trustDomain2);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
package spiffe.provider;
|
||||
|
||||
import lombok.val;
|
||||
import spiffe.exception.SocketEndpointAddressException;
|
||||
import spiffe.exception.X509SourceException;
|
||||
import spiffe.svid.x509svid.X509SvidSource;
|
||||
|
||||
import javax.net.ssl.KeyManager;
|
||||
|
|
@ -25,10 +27,19 @@ public final class SpiffeKeyManagerFactory extends KeyManagerFactorySpi {
|
|||
/**
|
||||
* Default method for creating the KeyManager, uses a X509Source instance
|
||||
* that is handled by the Singleton {@link X509SourceManager}
|
||||
*
|
||||
* @throws SpiffeProviderException in case there is an error setting up the X509 source
|
||||
*/
|
||||
@Override
|
||||
protected KeyManager[] engineGetKeyManagers() {
|
||||
val spiffeKeyManager = new SpiffeKeyManager(X509SourceManager.INSTANCE.getX509Source());
|
||||
SpiffeKeyManager spiffeKeyManager;
|
||||
try {
|
||||
spiffeKeyManager = new SpiffeKeyManager(X509SourceManager.getX509Source());
|
||||
} catch (X509SourceException e) {
|
||||
throw new SpiffeProviderException("The X509 source could not be created", e);
|
||||
} catch (SocketEndpointAddressException e) {
|
||||
throw new SpiffeProviderException("The Workload API Socket endpoint address configured is not valid", e);
|
||||
}
|
||||
return new KeyManager[]{spiffeKeyManager};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,12 @@ package spiffe.provider;
|
|||
*/
|
||||
class SpiffeProviderConstants {
|
||||
|
||||
/**
|
||||
* Security property to get the list of accepted SPIFFE IDs.
|
||||
* This property is read in the java.security file
|
||||
*/
|
||||
static final String SSL_SPIFFE_ACCEPT_PROPERTY = "ssl.spiffe.accept";
|
||||
|
||||
// the name of this Provider implementation
|
||||
static final String PROVIDER_NAME = "Spiffe";
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
package spiffe.provider;
|
||||
|
||||
/**
|
||||
* Unchecked exception thrown when there is an error setting up the
|
||||
* source of svids and bundles.
|
||||
*/
|
||||
public class SpiffeProviderException extends RuntimeException {
|
||||
|
||||
public SpiffeProviderException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public SpiffeProviderException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public SpiffeProviderException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
|
@ -23,7 +23,7 @@ public final class SpiffeSslContextFactory {
|
|||
private static final String DEFAULT_SSL_PROTOCOL = "TLSv1.2";
|
||||
|
||||
/**
|
||||
* Creates an SSLContext initialized with a SPIFFE KeyManager and TrustManager that are backed by
|
||||
* Creates an {@link SSLContext} initialized with a SPIFFE KeyManager and TrustManager that are backed by
|
||||
* the Workload API via a X509Source.
|
||||
*
|
||||
* @param options {@link SslContextOptions}. The option {@link X509Source} must be not null.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ package spiffe.provider;
|
|||
|
||||
import lombok.val;
|
||||
import spiffe.bundle.x509bundle.X509BundleSource;
|
||||
import spiffe.exception.SocketEndpointAddressException;
|
||||
import spiffe.exception.X509SourceException;
|
||||
import spiffe.spiffeid.SpiffeId;
|
||||
import spiffe.spiffeid.SpiffeIdUtils;
|
||||
|
||||
|
|
@ -12,6 +14,8 @@ import java.security.KeyStore;
|
|||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static spiffe.provider.SpiffeProviderConstants.SSL_SPIFFE_ACCEPT_PROPERTY;
|
||||
|
||||
/**
|
||||
* A <code>SpiffeTrustManagerFactory</code> is an implementation of a {@link javax.net.ssl.TrustManagerFactory} to create a
|
||||
* TrustManager backed by a X509BundleSource that is maintained via the Workload API.
|
||||
|
|
@ -27,9 +31,6 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public class SpiffeTrustManagerFactory extends TrustManagerFactorySpi {
|
||||
|
||||
// System property to get the list of accepted SPIFFE IDs
|
||||
private static final String SSL_SPIFFE_ACCEPT_PROPERTY = "ssl.spiffe.accept";
|
||||
|
||||
/**
|
||||
* Default method for creating a TrustManager initializing it with
|
||||
* the {@link spiffe.workloadapi.X509Source} instance
|
||||
|
|
@ -38,14 +39,22 @@ public class SpiffeTrustManagerFactory extends TrustManagerFactorySpi {
|
|||
* from the System Property defined in SSL_SPIFFE_ACCEPT_PROPERTY.
|
||||
*
|
||||
* @return a TrustManager array with an initialized TrustManager.
|
||||
* @throws SpiffeProviderException in case there is an error setting up the X509 source
|
||||
*/
|
||||
@Override
|
||||
public TrustManager[] engineGetTrustManagers() {
|
||||
val spiffeTrustManager =
|
||||
new SpiffeTrustManager(
|
||||
X509SourceManager.INSTANCE.getX509Source(),
|
||||
this::getAcceptedSpiffeIds
|
||||
);
|
||||
SpiffeTrustManager spiffeTrustManager =
|
||||
null;
|
||||
try {
|
||||
spiffeTrustManager = new SpiffeTrustManager(
|
||||
X509SourceManager.getX509Source(),
|
||||
this::getAcceptedSpiffeIds
|
||||
);
|
||||
} catch (X509SourceException e) {
|
||||
throw new SpiffeProviderException("The X509 source could not be created", e);
|
||||
} catch (SocketEndpointAddressException e) {
|
||||
throw new SpiffeProviderException("The Workload API Socket endpoint address configured is not valid", e);
|
||||
}
|
||||
return new TrustManager[]{spiffeTrustManager};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,21 +16,32 @@ import spiffe.workloadapi.X509Source;
|
|||
* to be used by the {@link SpiffeKeyManagerFactory} and {@link SpiffeTrustManagerFactory} to inject it
|
||||
* in the {@link SpiffeKeyManager} and {@link SpiffeTrustManager} instances.
|
||||
*/
|
||||
public enum X509SourceManager {
|
||||
public class X509SourceManager {
|
||||
|
||||
INSTANCE;
|
||||
private static volatile X509Source x509Source;
|
||||
|
||||
private final X509Source x509Source;
|
||||
|
||||
X509SourceManager() {
|
||||
try {
|
||||
x509Source = X509Source.newSource();
|
||||
} catch (SocketEndpointAddressException e) {
|
||||
throw new X509SourceException("Could not create X509 Source. Socket endpoint address is not valid", e);
|
||||
}
|
||||
public X509SourceManager() {
|
||||
}
|
||||
|
||||
public X509Source getX509Source() {
|
||||
return x509Source;
|
||||
/**
|
||||
* Returns the single instance handled by this singleton. If the instance has not been
|
||||
* created yet, it creates a new X509Source and initializes the singleton in a thread safe way.
|
||||
*
|
||||
* @return a {@link X509Source}
|
||||
* @throws X509SourceException if the X509 source could not be initialized
|
||||
* @throws SocketEndpointAddressException is the socket endpoint address is not valid
|
||||
*/
|
||||
public static X509Source getX509Source() throws X509SourceException, SocketEndpointAddressException {
|
||||
X509Source localRef = x509Source;
|
||||
if (localRef == null) {
|
||||
synchronized (X509SourceManager.class) {
|
||||
localRef = x509Source;
|
||||
if (localRef == null) {
|
||||
localRef = X509Source.newSource();
|
||||
x509Source = localRef;
|
||||
}
|
||||
}
|
||||
}
|
||||
return localRef;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package spiffe.provider.examples;
|
|||
|
||||
import lombok.val;
|
||||
import spiffe.exception.SocketEndpointAddressException;
|
||||
import spiffe.exception.X509SourceException;
|
||||
import spiffe.provider.SpiffeSslContextFactory;
|
||||
import spiffe.provider.SpiffeSslContextFactory.SslContextOptions;
|
||||
import spiffe.spiffeid.SpiffeId;
|
||||
|
|
@ -38,11 +39,11 @@ public class HttpsClient {
|
|||
int serverPort;
|
||||
|
||||
public static void main(String[] args) {
|
||||
String spiffeSocket = "unix:/tmp/agent.sock";
|
||||
String spiffeSocket = "unix:/tmp/agent2.sock";
|
||||
HttpsClient httpsClient = new HttpsClient(4000, spiffeSocket, HttpsClient::listOfSpiffeIds);
|
||||
try {
|
||||
httpsClient.run();
|
||||
} catch (KeyManagementException | NoSuchAlgorithmException | IOException | SocketEndpointAddressException e) {
|
||||
} catch (KeyManagementException | NoSuchAlgorithmException | IOException | SocketEndpointAddressException | X509SourceException e) {
|
||||
throw new RuntimeException("Error starting Https Client", e);
|
||||
}
|
||||
}
|
||||
|
|
@ -53,7 +54,7 @@ public class HttpsClient {
|
|||
this.acceptedSpiffeIdsListSupplier = acceptedSpiffeIdsListSupplier;
|
||||
}
|
||||
|
||||
void run() throws IOException, SocketEndpointAddressException, KeyManagementException, NoSuchAlgorithmException {
|
||||
void run() throws IOException, SocketEndpointAddressException, KeyManagementException, NoSuchAlgorithmException, X509SourceException {
|
||||
|
||||
val sourceOptions = X509SourceOptions
|
||||
.builder()
|
||||
|
|
|
|||
|
|
@ -2,8 +2,11 @@ package spiffe.provider.examples;
|
|||
|
||||
import lombok.val;
|
||||
import spiffe.exception.SocketEndpointAddressException;
|
||||
import spiffe.exception.X509SourceException;
|
||||
import spiffe.provider.SpiffeProviderException;
|
||||
import spiffe.provider.SpiffeSslContextFactory;
|
||||
import spiffe.provider.SpiffeSslContextFactory.SslContextOptions;
|
||||
import spiffe.provider.X509SourceManager;
|
||||
import spiffe.workloadapi.X509Source;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
|
@ -46,9 +49,9 @@ public class HttpsServer {
|
|||
void run() throws IOException, KeyManagementException, NoSuchAlgorithmException {
|
||||
X509Source x509Source = null;
|
||||
try {
|
||||
x509Source = X509Source.newSource();
|
||||
} catch (SocketEndpointAddressException e) {
|
||||
throw new RuntimeException(e);
|
||||
x509Source = X509SourceManager.getX509Source();
|
||||
} catch (SocketEndpointAddressException | X509SourceException e) {
|
||||
throw new SpiffeProviderException(e);
|
||||
}
|
||||
|
||||
val sslContextOptions = SslContextOptions
|
||||
|
|
|
|||
Loading…
Reference in New Issue