diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/JwtSource.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/JwtSource.java index 3506d0b..632ea9a 100644 --- a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/JwtSource.java +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/JwtSource.java @@ -43,7 +43,7 @@ public class JwtSource implements JwtSvidSource, BundleSource, Closea private JwtBundleSet bundles; private WorkloadApiClient workloadApiClient; - private volatile boolean closed; + private boolean closed; // private constructor private JwtSource() { diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java new file mode 100644 index 0000000..a567f4a --- /dev/null +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java @@ -0,0 +1,136 @@ +package io.spiffe.workloadapi; + +import io.grpc.Context; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.spiffe.bundle.jwtbundle.JwtBundleSet; +import io.spiffe.exception.JwtBundleException; +import io.spiffe.exception.X509ContextException; +import io.spiffe.exception.X509SvidException; +import io.spiffe.workloadapi.grpc.SpiffeWorkloadAPIGrpc; +import io.spiffe.workloadapi.grpc.Workload; +import io.spiffe.workloadapi.retry.RetryHandler; +import lombok.extern.java.Log; +import lombok.val; + +import java.security.KeyException; +import java.security.cert.CertificateException; +import java.util.logging.Level; + +@Log +final class StreamObservers { + + private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT"; + + private StreamObservers() { + } + + static StreamObserver getX509ContextStreamObserver( + final Watcher watcher, + final RetryHandler retryHandler, + final Context.CancellableContext cancellableContext, + final SpiffeWorkloadAPIGrpc.SpiffeWorkloadAPIStub workloadApiAsyncStub) { + + return new StreamObserver() { + @Override + public void onNext(final Workload.X509SVIDResponse value) { + try { + val x509Context = GrpcConversionUtils.toX509Context(value); + validateX509Context(x509Context); + watcher.onUpdate(x509Context); + retryHandler.reset(); + } catch (CertificateException | X509SvidException | X509ContextException e) { + watcher.onError(new X509ContextException("Error processing X.509 Context update", e)); + } + } + + @Override + public void onError(final Throwable t) { + log.log(Level.SEVERE, "X.509 context observer error", t); + handleWatchX509ContextError(t); + } + + private void handleWatchX509ContextError(final Throwable t) { + if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) { + watcher.onError(new X509ContextException("Canceling X.509 Context watch", t)); + } else { + log.log(Level.INFO, "Retrying connecting to Workload API to register X.509 context watcher"); + retryHandler.scheduleRetry(() -> + cancellableContext.run( + () -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), + this))); + } + } + + @Override + public void onCompleted() { + cancellableContext.close(); + log.info("Workload API stream is completed"); + } + }; + } + + static StreamObserver getJwtBundleStreamObserver( + final Watcher watcher, + final RetryHandler retryHandler, + final Context.CancellableContext cancellableContext, + final SpiffeWorkloadAPIGrpc.SpiffeWorkloadAPIStub workloadApiAsyncStub) { + return new StreamObserver() { + + @Override + public void onNext(final Workload.JWTBundlesResponse value) { + try { + val jwtBundleSet = GrpcConversionUtils.toBundleSet(value); + watcher.onUpdate(jwtBundleSet); + retryHandler.reset(); + } catch (KeyException | JwtBundleException e) { + watcher.onError(new JwtBundleException("Error processing JWT bundles update", e)); + } + } + + @Override + public void onError(final Throwable t) { + log.log(Level.SEVERE, "JWT observer error", t); + handleWatchJwtBundleError(t); + } + + private void handleWatchJwtBundleError(final Throwable t) { + if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) { + watcher.onError(new JwtBundleException("Canceling JWT Bundles watch", t)); + } else { + log.log(Level.INFO, "Retrying connecting to Workload API to register JWT Bundles watcher"); + retryHandler.scheduleRetry(() -> + cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(), + this))); + } + } + + @Override + public void onCompleted() { + cancellableContext.close(); + log.info("Workload API stream is completed"); + } + }; + } + + // validates that the X.509 context has both the SVID and the bundles + private static void validateX509Context(final X509Context x509Context) throws X509ContextException { + if (x509Context.getX509BundleSet() == null + || x509Context.getX509BundleSet().getBundles() == null + || x509Context.getX509BundleSet().getBundles().isEmpty()) { + throw new X509ContextException("X.509 context error: no X.509 bundles found"); + } + + if (x509Context.getX509Svid() == null || x509Context.getX509Svid().isEmpty()) { + throw new X509ContextException("X.509 context error: no X.509 SVID found"); + } + } + + private static Workload.X509SVIDRequest newX509SvidRequest() { + return Workload.X509SVIDRequest.newBuilder().build(); + } + + private static Workload.JWTBundlesRequest newJwtBundlesRequest() { + return Workload.JWTBundlesRequest.newBuilder().build(); + } +} diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/WorkloadApiClient.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/WorkloadApiClient.java index bf30654..ff64d0c 100644 --- a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/WorkloadApiClient.java +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/WorkloadApiClient.java @@ -1,8 +1,6 @@ package io.spiffe.workloadapi; import io.grpc.Context; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import io.spiffe.bundle.jwtbundle.JwtBundleSet; import io.spiffe.exception.JwtBundleException; import io.spiffe.exception.JwtSvidException; @@ -35,7 +33,6 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -43,6 +40,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; +import static io.spiffe.workloadapi.StreamObservers.getJwtBundleStreamObserver; +import static io.spiffe.workloadapi.StreamObservers.getX509ContextStreamObserver; + /** * Represents a client to interact with the Workload API. *

@@ -52,9 +52,7 @@ import java.util.logging.Level; * the stream connection to the Workload API. */ @Log -public class WorkloadApiClient implements Closeable { - - private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT"; +public final class WorkloadApiClient implements Closeable { private final SpiffeWorkloadAPIStub workloadApiAsyncStub; private final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub; @@ -70,12 +68,12 @@ public class WorkloadApiClient implements Closeable { private boolean closed; - private WorkloadApiClient(SpiffeWorkloadAPIStub workloadApiAsyncStub, - SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, - ManagedChannelWrapper managedChannel, - BackoffPolicy backoffPolicy, - ScheduledExecutorService retryExecutor, - ExecutorService executorService) { + private WorkloadApiClient(final SpiffeWorkloadAPIStub workloadApiAsyncStub, + final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, + final ManagedChannelWrapper managedChannel, + final BackoffPolicy backoffPolicy, + final ScheduledExecutorService retryExecutor, + final ExecutorService executorService) { this.workloadApiAsyncStub = workloadApiAsyncStub; this.workloadApiBlockingStub = workloadApiBlockingStub; this.managedChannel = managedChannel; @@ -95,9 +93,9 @@ public class WorkloadApiClient implements Closeable { * @param workloadApiBlockingStub a {@link SpiffeWorkloadAPIBlockingStub} * @param managedChannel a {@link ManagedChannelWrapper} */ - public WorkloadApiClient(SpiffeWorkloadAPIStub workloadApiAsyncStub, - SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, - ManagedChannelWrapper managedChannel) { + public WorkloadApiClient(final SpiffeWorkloadAPIStub workloadApiAsyncStub, + final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, + final ManagedChannelWrapper managedChannel) { this.workloadApiAsyncStub = workloadApiAsyncStub; this.workloadApiBlockingStub = workloadApiBlockingStub; this.backoffPolicy = new BackoffPolicy(); @@ -187,11 +185,12 @@ public class WorkloadApiClient implements Closeable { * * @param watcher an instance that implements a {@link Watcher}. */ - public void watchX509Context(@NonNull Watcher watcher) { + public void watchX509Context(@NonNull final Watcher watcher) { val retryHandler = new RetryHandler(backoffPolicy, retryExecutor); val cancellableContext = Context.current().withCancellation(); - val streamObserver = getX509ContextStreamObserver(watcher, retryHandler, cancellableContext); + val streamObserver = + getX509ContextStreamObserver(watcher, retryHandler, cancellableContext, workloadApiAsyncStub); cancellableContext.run(() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), streamObserver)); this.cancellableContexts.add(cancellableContext); @@ -206,10 +205,9 @@ public class WorkloadApiClient implements Closeable { * @return an instance of a {@link JwtSvid} * @throws JwtSvidException if there is an error fetching or processing the JWT from the Workload API */ - public JwtSvid fetchJwtSvid( - @NonNull final SpiffeId subject, - @NonNull final String audience, - final String... extraAudience) + public JwtSvid fetchJwtSvid(@NonNull final SpiffeId subject, + @NonNull final String audience, + final String... extraAudience) throws JwtSvidException { final Set audParam = new HashSet<>(); @@ -271,7 +269,7 @@ public class WorkloadApiClient implements Closeable { val retryHandler = new RetryHandler(backoffPolicy, retryExecutor); val cancellableContext = Context.current().withCancellation(); - val streamObserver = getJwtBundleStreamObserver(watcher, retryHandler, cancellableContext); + val streamObserver = getJwtBundleStreamObserver(watcher, retryHandler, cancellableContext, workloadApiAsyncStub); cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(), streamObserver)); this.cancellableContexts.add(cancellableContext); @@ -302,116 +300,9 @@ public class WorkloadApiClient implements Closeable { } - private StreamObserver getX509ContextStreamObserver( - Watcher watcher, - RetryHandler retryHandler, - Context.CancellableContext cancellableContext) { - return new StreamObserver() { - @Override - public void onNext(Workload.X509SVIDResponse value) { - try { - val x509Context = GrpcConversionUtils.toX509Context(value); - validateX509Context(x509Context); - watcher.onUpdate(x509Context); - retryHandler.reset(); - } catch (CertificateException | X509SvidException | X509ContextException e) { - watcher.onError(new X509ContextException("Error processing X.509 Context update", e)); - } - } - - @Override - public void onError(Throwable t) { - log.log(Level.SEVERE, "X.509 context observer error", t); - handleWatchX509ContextError(t); - } - - private void handleWatchX509ContextError(Throwable t) { - if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) { - watcher.onError(new X509ContextException("Canceling X.509 Context watch", t)); - } else { - log.log(Level.INFO, "Retrying connecting to Workload API to register X.509 context watcher"); - retryHandler.scheduleRetry(() -> - cancellableContext.run( - () -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), - this))); - } - } - - @Override - public void onCompleted() { - cancellableContext.close(); - log.info("Workload API stream is completed"); - } - }; - } - - private StreamObserver getJwtBundleStreamObserver( - Watcher watcher, - RetryHandler retryHandler, - Context.CancellableContext cancellableContext) { - return new StreamObserver() { - - @Override - public void onNext(Workload.JWTBundlesResponse value) { - try { - val jwtBundleSet = GrpcConversionUtils.toBundleSet(value); - watcher.onUpdate(jwtBundleSet); - retryHandler.reset(); - } catch (KeyException | JwtBundleException e) { - watcher.onError(new JwtBundleException("Error processing JWT bundles update", e)); - } - } - - @Override - public void onError(Throwable t) { - log.log(Level.SEVERE, "JWT observer error", t); - handleWatchJwtBundleError(t); - } - - private void handleWatchJwtBundleError(Throwable t) { - if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) { - watcher.onError(new JwtBundleException("Canceling JWT Bundles watch", t)); - } else { - log.log(Level.INFO, "Retrying connecting to Workload API to register JWT Bundles watcher"); - retryHandler.scheduleRetry(() -> - cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(), - this))); - } - } - - @Override - public void onCompleted() { - cancellableContext.close(); - log.info("Workload API stream is completed"); - } - }; - } - - // validates that the X.509 context has both the SVID and the bundles - private void validateX509Context(X509Context x509Context) throws X509ContextException { - if (x509Context.getX509BundleSet() == null - || x509Context.getX509BundleSet().getBundles() == null - || x509Context.getX509BundleSet().getBundles().isEmpty()) { - throw new X509ContextException("X.509 context error: no X.509 bundles found"); - } - - if (x509Context.getX509Svid() == null || x509Context.getX509Svid().isEmpty()) { - throw new X509ContextException("X.509 context error: no X.509 SVID found"); - } - } - - private Workload.X509SVIDRequest newX509SvidRequest() { - return Workload.X509SVIDRequest.newBuilder().build(); - } - - private Workload.JWTBundlesRequest newJwtBundlesRequest() { - return Workload.JWTBundlesRequest.newBuilder().build(); - } - private X509Context processX509Context() throws X509ContextException { try { - final Iterator x509SvidResponse = - workloadApiBlockingStub.fetchX509SVID(newX509SvidRequest()); + val x509SvidResponse = workloadApiBlockingStub.fetchX509SVID(newX509SvidRequest()); if (x509SvidResponse.hasNext()) { return GrpcConversionUtils.toX509Context(x509SvidResponse.next()); } @@ -421,22 +312,18 @@ public class WorkloadApiClient implements Closeable { throw new X509ContextException("Error processing X509Context: x509SVIDResponse is empty"); } - private JwtSvid callFetchJwtSvid(SpiffeId subject, Set audience) throws JwtSvidException { - final Workload.JWTSVIDRequest jwtsvidRequest = Workload.JWTSVIDRequest - .newBuilder() + private JwtSvid callFetchJwtSvid(final SpiffeId subject, final Set audience) throws JwtSvidException { + val jwtSvidRequest = Workload.JWTSVIDRequest.newBuilder() .setSpiffeId(subject.toString()) .addAllAudience(audience) .build(); - final Workload.JWTSVIDResponse response = workloadApiBlockingStub.fetchJWTSVID(jwtsvidRequest); - + val response = workloadApiBlockingStub.fetchJWTSVID(jwtSvidRequest); return JwtSvid.parseInsecure(response.getSvids(0).getSvid(), audience); } private JwtBundleSet callFetchBundles() throws JwtBundleException { - final Workload.JWTBundlesRequest request = Workload.JWTBundlesRequest - .newBuilder() - .build(); - final Iterator bundlesResponse = workloadApiBlockingStub.fetchJWTBundles(request); + val request = Workload.JWTBundlesRequest.newBuilder().build(); + val bundlesResponse = workloadApiBlockingStub.fetchJWTBundles(request); if (bundlesResponse.hasNext()) { try { @@ -448,6 +335,14 @@ public class WorkloadApiClient implements Closeable { throw new JwtBundleException("JWT Bundle response from the Workload API is empty"); } + private Workload.X509SVIDRequest newX509SvidRequest() { + return Workload.X509SVIDRequest.newBuilder().build(); + } + + private Workload.JWTBundlesRequest newJwtBundlesRequest() { + return Workload.JWTBundlesRequest.newBuilder().build(); + } + /** * Options for creating a new {@link WorkloadApiClient}. *

@@ -473,7 +368,9 @@ public class WorkloadApiClient implements Closeable { private ExecutorService executorService; @Builder - public ClientOptions(String spiffeSocketPath, BackoffPolicy backoffPolicy, ExecutorService executorService) { + public ClientOptions(final String spiffeSocketPath, + final BackoffPolicy backoffPolicy, + final ExecutorService executorService) { this.spiffeSocketPath = spiffeSocketPath; this.backoffPolicy = backoffPolicy; this.executorService = executorService; diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/X509Source.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/X509Source.java index f7ce6b0..d988029 100644 --- a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/X509Source.java +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/X509Source.java @@ -50,7 +50,7 @@ public final class X509Source implements X509SvidSource, BundleSource, X509Svid> picker; private WorkloadApiClient workloadApiClient; - private volatile boolean closed; + private boolean closed; // private constructor private X509Source() { @@ -171,7 +171,7 @@ public final class X509Source implements X509SvidSource, BundleSource() { @Override - public void onUpdate(X509Context update) { + public void onUpdate(final X509Context update) { log.log(Level.INFO, "Received X509Context update"); setX509Context(update); done.countDown(); } @Override - public void onError(Throwable error) { + public void onError(final Throwable error) { log.log(Level.SEVERE, String.format("Error in X509Context watcher: %s", ExceptionUtils.getStackTrace(error))); done.countDown(); @@ -246,10 +246,10 @@ public final class X509Source implements X509SvidSource, BundleSource, X509Svid> svidPicker, - WorkloadApiClient workloadApiClient) { + public X509SourceOptions(final String spiffeSocketPath, + final Duration initTimeout, + final Function, X509Svid> svidPicker, + final WorkloadApiClient workloadApiClient) { this.spiffeSocketPath = spiffeSocketPath; this.initTimeout = initTimeout; this.svidPicker = svidPicker; diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java index 8ad074a..45a1620 100644 --- a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java @@ -15,7 +15,7 @@ public class RetryHandler { private int retryCount; - public RetryHandler(BackoffPolicy backoffPolicy, ScheduledExecutorService executor) { + public RetryHandler(final BackoffPolicy backoffPolicy, final ScheduledExecutorService executor) { this.nextDelay = backoffPolicy.getInitialDelay(); this.backoffPolicy = backoffPolicy; this.executor = executor;