Adding timeout to X509Source new method.

Signed-off-by: Max Lambrecht <maxlambrecht@gmail.com>
This commit is contained in:
Max Lambrecht 2020-05-06 11:16:20 -03:00
parent 8027b39298
commit 29daad1c5b
3 changed files with 87 additions and 18 deletions

View File

@ -2,12 +2,41 @@
Core functionality to fetch X509 and JWT SVIDs from the Workload API.
## Create a X509Source
## X509Source
A `spiffe.workloadapi.X509Source` represents a source of X.509 SVIDs and X.509 bundles maintained via the Workload API.
To create a new X509 Source:
```
TBD
try {
x509Source = X509Source.newSource();
} catch (SocketEndpointAddressException | X509SourceException e) {
// handle exception
}
```
The `newSource()` blocks until the X505 materials can be retrieved from the Workload API and the X509Source is
initialized with the SVID and Bundles. A `X509 context watcher` is configured on the X509Source to get automatically
the updates from the Workload API. This watcher performs retries if at any time the connection to the Workload API
reports an error.
The socket endpoint address is configured through the environment variable `SPIFFE_ENDPOINT_SOCKET`. Another way to
configure it is by providing a `X509SourceOptions` instance to the `newSource` method.
### Configure a timeout for X509Source initialization
The method `X509Source newSource()` blocks waiting until a X509 context is fetched. The X509 context fetch is retried
using an exponential backoff policy with this progression of delays between retries: 1 second, 2 seconds, 4, 8, 16, 32, 60, 60, 60...
It retries indefinitely unless a timeout is configured.
This timeout can be configured either providing it through the `newSource(Duration timeout)` method or
using a System property:
`spiffe.newX509Source.timeout=30`
The Time Unit is seconds.
## Netty Event Loop thread number configuration
Use the variable `io.netty.eventLoopThreads` to configure the number of threads for the Netty Event Loop Group.

View File

@ -11,14 +11,17 @@ import spiffe.bundle.x509bundle.X509BundleSet;
import spiffe.bundle.x509bundle.X509BundleSource;
import spiffe.exception.BundleNotFoundException;
import spiffe.exception.SocketEndpointAddressException;
import spiffe.exception.X509ContextException;
import spiffe.exception.X509SourceException;
import spiffe.spiffeid.TrustDomain;
import spiffe.svid.x509svid.X509Svid;
import spiffe.svid.x509svid.X509SvidSource;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
@ -38,6 +41,12 @@ import java.util.logging.Level;
@Log
public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
private static final Duration DEFAULT_TIMEOUT;
static {
DEFAULT_TIMEOUT = Duration.ofSeconds(Long.getLong("spiffe.newX509Source.timeout", 0));
}
private X509Svid svid;
private X509BundleSet bundles;
@ -47,20 +56,38 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
/**
* Creates a new X.509 source. It blocks until the initial update
* has been received from the Workload API.
* has been received from the Workload API or until the timeout configured
* through the system property `spiffe.newX509Source.timeout` expires.
* <p>
* It uses the default address socket endpoint from the environment variable to get the Workload API address.
* <p>
* It uses the default X.509 SVID.
*
* @return an instance of {@link X509Source}, with the svid and bundles initialized
*
* @throws SocketEndpointAddressException if the address to the Workload API is not valid
* @throws X509SourceException if the source could not be initialized
*/
public static X509Source newSource() throws SocketEndpointAddressException, X509SourceException {
X509SourceOptions x509SourceOptions = X509SourceOptions.builder().build();
return newSource(x509SourceOptions);
return newSource(x509SourceOptions, DEFAULT_TIMEOUT);
}
/**
* Creates a new X.509 source. It blocks until the initial update
* has been received from the Workload API or until the timeout provided expires
* <p>
* It uses the default address socket endpoint from the environment variable to get the Workload API address.
* <p>
* It uses the default X.509 SVID.
*
* @param timeout Time to wait for the X509 context update. If the timeout is Zero, it will wait indefinitely.
* @return an instance of {@link X509Source}, with the svid and bundles initialized
* @throws SocketEndpointAddressException if the address to the Workload API is not valid
* @throws X509SourceException if the source could not be initialized
*/
public static X509Source newSource(Duration timeout) throws SocketEndpointAddressException, X509SourceException {
X509SourceOptions x509SourceOptions = X509SourceOptions.builder().build();
return newSource(x509SourceOptions, timeout);
}
/**
@ -70,13 +97,13 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
* The {@link WorkloadApiClient} can be provided in the options, if it is not,
* a new client is created.
*
* @param timeout Time to wait for the X509 context update. If the timeout is Zero, it will wait indefinitely.
* @param options {@link X509SourceOptions}
* @return an instance of {@link X509Source}, with the svid and bundles initialized
*
* @throws SocketEndpointAddressException if the address to the Workload API is not valid
* @throws X509SourceException if the source could not be initialized
*/
public static X509Source newSource(@NonNull X509SourceOptions options) throws SocketEndpointAddressException, X509SourceException {
public static X509Source newSource(@NonNull X509SourceOptions options, Duration timeout) throws SocketEndpointAddressException, X509SourceException {
if (options.workloadApiClient == null) {
options.workloadApiClient = createClient(options);
}
@ -86,7 +113,7 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
x509Source.workloadApiClient = options.workloadApiClient;
try {
x509Source.init();
x509Source.init(timeout);
} catch (Exception e) {
x509Source.close();
throw new X509SourceException("Error creating X509 source", e);
@ -150,23 +177,35 @@ public class X509Source implements X509SvidSource, X509BundleSource, Closeable {
return WorkloadApiClient.newClient(clientOptions);
}
private void init() throws X509ContextException {
X509Context x509Context = workloadApiClient.fetchX509Context();
setX509Context(x509Context);
setX509ContextWatcher();
private void init(Duration timeout) throws InterruptedException, TimeoutException {
CountDownLatch done = new CountDownLatch(1);
setX509ContextWatcher(done);
boolean success;
if (timeout.isZero()) {
done.await();
success = true;
} else {
success = done.await(timeout.getSeconds(), TimeUnit.SECONDS);
}
if (!success) {
throw new TimeoutException("Timeout waiting for X509 Context update");
}
}
private void setX509ContextWatcher() {
private void setX509ContextWatcher(CountDownLatch done) {
workloadApiClient.watchX509Context(new Watcher<X509Context>() {
@Override
public void onUpdate(X509Context update) {
log.log(Level.INFO, "Received X509Context update");
setX509Context(update);
done.countDown();
}
@Override
public void onError(Throwable error) {
log.log(Level.SEVERE, String.format("Error in X509Context watcher: %s", ExceptionUtils.getStackTrace(error)));
done.countDown();
}
});
}

View File

@ -19,7 +19,8 @@ import java.security.cert.CertificateException;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;
public class X509SvidValidatorTest {