Refactoring WorkloadApiClient to reduce complexity.

Addressing code style issues.

Signed-off-by: Max Lambrecht <maxlambrecht@gmail.com>
This commit is contained in:
Max Lambrecht 2020-06-26 15:34:25 -03:00
parent 7268c54a28
commit 14fbae8fa2
5 changed files with 184 additions and 151 deletions

View File

@ -43,7 +43,7 @@ public class JwtSource implements JwtSvidSource, BundleSource<JwtBundle>, Closea
private JwtBundleSet bundles; private JwtBundleSet bundles;
private WorkloadApiClient workloadApiClient; private WorkloadApiClient workloadApiClient;
private volatile boolean closed; private boolean closed;
// private constructor // private constructor
private JwtSource() { private JwtSource() {

View File

@ -0,0 +1,136 @@
package io.spiffe.workloadapi;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.spiffe.bundle.jwtbundle.JwtBundleSet;
import io.spiffe.exception.JwtBundleException;
import io.spiffe.exception.X509ContextException;
import io.spiffe.exception.X509SvidException;
import io.spiffe.workloadapi.grpc.SpiffeWorkloadAPIGrpc;
import io.spiffe.workloadapi.grpc.Workload;
import io.spiffe.workloadapi.retry.RetryHandler;
import lombok.extern.java.Log;
import lombok.val;
import java.security.KeyException;
import java.security.cert.CertificateException;
import java.util.logging.Level;
@Log
final class StreamObservers {
private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT";
private StreamObservers() {
}
static StreamObserver<Workload.X509SVIDResponse> getX509ContextStreamObserver(
final Watcher<X509Context> watcher,
final RetryHandler retryHandler,
final Context.CancellableContext cancellableContext,
final SpiffeWorkloadAPIGrpc.SpiffeWorkloadAPIStub workloadApiAsyncStub) {
return new StreamObserver<Workload.X509SVIDResponse>() {
@Override
public void onNext(final Workload.X509SVIDResponse value) {
try {
val x509Context = GrpcConversionUtils.toX509Context(value);
validateX509Context(x509Context);
watcher.onUpdate(x509Context);
retryHandler.reset();
} catch (CertificateException | X509SvidException | X509ContextException e) {
watcher.onError(new X509ContextException("Error processing X.509 Context update", e));
}
}
@Override
public void onError(final Throwable t) {
log.log(Level.SEVERE, "X.509 context observer error", t);
handleWatchX509ContextError(t);
}
private void handleWatchX509ContextError(final Throwable t) {
if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) {
watcher.onError(new X509ContextException("Canceling X.509 Context watch", t));
} else {
log.log(Level.INFO, "Retrying connecting to Workload API to register X.509 context watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(
() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(),
this)));
}
}
@Override
public void onCompleted() {
cancellableContext.close();
log.info("Workload API stream is completed");
}
};
}
static StreamObserver<Workload.JWTBundlesResponse> getJwtBundleStreamObserver(
final Watcher<JwtBundleSet> watcher,
final RetryHandler retryHandler,
final Context.CancellableContext cancellableContext,
final SpiffeWorkloadAPIGrpc.SpiffeWorkloadAPIStub workloadApiAsyncStub) {
return new StreamObserver<Workload.JWTBundlesResponse>() {
@Override
public void onNext(final Workload.JWTBundlesResponse value) {
try {
val jwtBundleSet = GrpcConversionUtils.toBundleSet(value);
watcher.onUpdate(jwtBundleSet);
retryHandler.reset();
} catch (KeyException | JwtBundleException e) {
watcher.onError(new JwtBundleException("Error processing JWT bundles update", e));
}
}
@Override
public void onError(final Throwable t) {
log.log(Level.SEVERE, "JWT observer error", t);
handleWatchJwtBundleError(t);
}
private void handleWatchJwtBundleError(final Throwable t) {
if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) {
watcher.onError(new JwtBundleException("Canceling JWT Bundles watch", t));
} else {
log.log(Level.INFO, "Retrying connecting to Workload API to register JWT Bundles watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(),
this)));
}
}
@Override
public void onCompleted() {
cancellableContext.close();
log.info("Workload API stream is completed");
}
};
}
// validates that the X.509 context has both the SVID and the bundles
private static void validateX509Context(final X509Context x509Context) throws X509ContextException {
if (x509Context.getX509BundleSet() == null
|| x509Context.getX509BundleSet().getBundles() == null
|| x509Context.getX509BundleSet().getBundles().isEmpty()) {
throw new X509ContextException("X.509 context error: no X.509 bundles found");
}
if (x509Context.getX509Svid() == null || x509Context.getX509Svid().isEmpty()) {
throw new X509ContextException("X.509 context error: no X.509 SVID found");
}
}
private static Workload.X509SVIDRequest newX509SvidRequest() {
return Workload.X509SVIDRequest.newBuilder().build();
}
private static Workload.JWTBundlesRequest newJwtBundlesRequest() {
return Workload.JWTBundlesRequest.newBuilder().build();
}
}

View File

@ -1,8 +1,6 @@
package io.spiffe.workloadapi; package io.spiffe.workloadapi;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.spiffe.bundle.jwtbundle.JwtBundleSet; import io.spiffe.bundle.jwtbundle.JwtBundleSet;
import io.spiffe.exception.JwtBundleException; import io.spiffe.exception.JwtBundleException;
import io.spiffe.exception.JwtSvidException; import io.spiffe.exception.JwtSvidException;
@ -35,7 +33,6 @@ import java.security.cert.CertificateException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -43,6 +40,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import static io.spiffe.workloadapi.StreamObservers.getJwtBundleStreamObserver;
import static io.spiffe.workloadapi.StreamObservers.getX509ContextStreamObserver;
/** /**
* Represents a client to interact with the Workload API. * Represents a client to interact with the Workload API.
* <p> * <p>
@ -52,9 +52,7 @@ import java.util.logging.Level;
* the stream connection to the Workload API. * the stream connection to the Workload API.
*/ */
@Log @Log
public class WorkloadApiClient implements Closeable { public final class WorkloadApiClient implements Closeable {
private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT";
private final SpiffeWorkloadAPIStub workloadApiAsyncStub; private final SpiffeWorkloadAPIStub workloadApiAsyncStub;
private final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub; private final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub;
@ -70,12 +68,12 @@ public class WorkloadApiClient implements Closeable {
private boolean closed; private boolean closed;
private WorkloadApiClient(SpiffeWorkloadAPIStub workloadApiAsyncStub, private WorkloadApiClient(final SpiffeWorkloadAPIStub workloadApiAsyncStub,
SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub,
ManagedChannelWrapper managedChannel, final ManagedChannelWrapper managedChannel,
BackoffPolicy backoffPolicy, final BackoffPolicy backoffPolicy,
ScheduledExecutorService retryExecutor, final ScheduledExecutorService retryExecutor,
ExecutorService executorService) { final ExecutorService executorService) {
this.workloadApiAsyncStub = workloadApiAsyncStub; this.workloadApiAsyncStub = workloadApiAsyncStub;
this.workloadApiBlockingStub = workloadApiBlockingStub; this.workloadApiBlockingStub = workloadApiBlockingStub;
this.managedChannel = managedChannel; this.managedChannel = managedChannel;
@ -95,9 +93,9 @@ public class WorkloadApiClient implements Closeable {
* @param workloadApiBlockingStub a {@link SpiffeWorkloadAPIBlockingStub} * @param workloadApiBlockingStub a {@link SpiffeWorkloadAPIBlockingStub}
* @param managedChannel a {@link ManagedChannelWrapper} * @param managedChannel a {@link ManagedChannelWrapper}
*/ */
public WorkloadApiClient(SpiffeWorkloadAPIStub workloadApiAsyncStub, public WorkloadApiClient(final SpiffeWorkloadAPIStub workloadApiAsyncStub,
SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub, final SpiffeWorkloadAPIBlockingStub workloadApiBlockingStub,
ManagedChannelWrapper managedChannel) { final ManagedChannelWrapper managedChannel) {
this.workloadApiAsyncStub = workloadApiAsyncStub; this.workloadApiAsyncStub = workloadApiAsyncStub;
this.workloadApiBlockingStub = workloadApiBlockingStub; this.workloadApiBlockingStub = workloadApiBlockingStub;
this.backoffPolicy = new BackoffPolicy(); this.backoffPolicy = new BackoffPolicy();
@ -187,11 +185,12 @@ public class WorkloadApiClient implements Closeable {
* *
* @param watcher an instance that implements a {@link Watcher}. * @param watcher an instance that implements a {@link Watcher}.
*/ */
public void watchX509Context(@NonNull Watcher<X509Context> watcher) { public void watchX509Context(@NonNull final Watcher<X509Context> watcher) {
val retryHandler = new RetryHandler(backoffPolicy, retryExecutor); val retryHandler = new RetryHandler(backoffPolicy, retryExecutor);
val cancellableContext = Context.current().withCancellation(); val cancellableContext = Context.current().withCancellation();
val streamObserver = getX509ContextStreamObserver(watcher, retryHandler, cancellableContext); val streamObserver =
getX509ContextStreamObserver(watcher, retryHandler, cancellableContext, workloadApiAsyncStub);
cancellableContext.run(() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), streamObserver)); cancellableContext.run(() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), streamObserver));
this.cancellableContexts.add(cancellableContext); this.cancellableContexts.add(cancellableContext);
@ -206,10 +205,9 @@ public class WorkloadApiClient implements Closeable {
* @return an instance of a {@link JwtSvid} * @return an instance of a {@link JwtSvid}
* @throws JwtSvidException if there is an error fetching or processing the JWT from the Workload API * @throws JwtSvidException if there is an error fetching or processing the JWT from the Workload API
*/ */
public JwtSvid fetchJwtSvid( public JwtSvid fetchJwtSvid(@NonNull final SpiffeId subject,
@NonNull final SpiffeId subject, @NonNull final String audience,
@NonNull final String audience, final String... extraAudience)
final String... extraAudience)
throws JwtSvidException { throws JwtSvidException {
final Set<String> audParam = new HashSet<>(); final Set<String> audParam = new HashSet<>();
@ -271,7 +269,7 @@ public class WorkloadApiClient implements Closeable {
val retryHandler = new RetryHandler(backoffPolicy, retryExecutor); val retryHandler = new RetryHandler(backoffPolicy, retryExecutor);
val cancellableContext = Context.current().withCancellation(); val cancellableContext = Context.current().withCancellation();
val streamObserver = getJwtBundleStreamObserver(watcher, retryHandler, cancellableContext); val streamObserver = getJwtBundleStreamObserver(watcher, retryHandler, cancellableContext, workloadApiAsyncStub);
cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(), streamObserver)); cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(), streamObserver));
this.cancellableContexts.add(cancellableContext); this.cancellableContexts.add(cancellableContext);
@ -302,116 +300,9 @@ public class WorkloadApiClient implements Closeable {
} }
private StreamObserver<Workload.X509SVIDResponse> getX509ContextStreamObserver(
Watcher<X509Context> watcher,
RetryHandler retryHandler,
Context.CancellableContext cancellableContext) {
return new StreamObserver<Workload.X509SVIDResponse>() {
@Override
public void onNext(Workload.X509SVIDResponse value) {
try {
val x509Context = GrpcConversionUtils.toX509Context(value);
validateX509Context(x509Context);
watcher.onUpdate(x509Context);
retryHandler.reset();
} catch (CertificateException | X509SvidException | X509ContextException e) {
watcher.onError(new X509ContextException("Error processing X.509 Context update", e));
}
}
@Override
public void onError(Throwable t) {
log.log(Level.SEVERE, "X.509 context observer error", t);
handleWatchX509ContextError(t);
}
private void handleWatchX509ContextError(Throwable t) {
if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) {
watcher.onError(new X509ContextException("Canceling X.509 Context watch", t));
} else {
log.log(Level.INFO, "Retrying connecting to Workload API to register X.509 context watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(
() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(),
this)));
}
}
@Override
public void onCompleted() {
cancellableContext.close();
log.info("Workload API stream is completed");
}
};
}
private StreamObserver<Workload.JWTBundlesResponse> getJwtBundleStreamObserver(
Watcher<JwtBundleSet> watcher,
RetryHandler retryHandler,
Context.CancellableContext cancellableContext) {
return new StreamObserver<Workload.JWTBundlesResponse>() {
@Override
public void onNext(Workload.JWTBundlesResponse value) {
try {
val jwtBundleSet = GrpcConversionUtils.toBundleSet(value);
watcher.onUpdate(jwtBundleSet);
retryHandler.reset();
} catch (KeyException | JwtBundleException e) {
watcher.onError(new JwtBundleException("Error processing JWT bundles update", e));
}
}
@Override
public void onError(Throwable t) {
log.log(Level.SEVERE, "JWT observer error", t);
handleWatchJwtBundleError(t);
}
private void handleWatchJwtBundleError(Throwable t) {
if (INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name())) {
watcher.onError(new JwtBundleException("Canceling JWT Bundles watch", t));
} else {
log.log(Level.INFO, "Retrying connecting to Workload API to register JWT Bundles watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(),
this)));
}
}
@Override
public void onCompleted() {
cancellableContext.close();
log.info("Workload API stream is completed");
}
};
}
// validates that the X.509 context has both the SVID and the bundles
private void validateX509Context(X509Context x509Context) throws X509ContextException {
if (x509Context.getX509BundleSet() == null
|| x509Context.getX509BundleSet().getBundles() == null
|| x509Context.getX509BundleSet().getBundles().isEmpty()) {
throw new X509ContextException("X.509 context error: no X.509 bundles found");
}
if (x509Context.getX509Svid() == null || x509Context.getX509Svid().isEmpty()) {
throw new X509ContextException("X.509 context error: no X.509 SVID found");
}
}
private Workload.X509SVIDRequest newX509SvidRequest() {
return Workload.X509SVIDRequest.newBuilder().build();
}
private Workload.JWTBundlesRequest newJwtBundlesRequest() {
return Workload.JWTBundlesRequest.newBuilder().build();
}
private X509Context processX509Context() throws X509ContextException { private X509Context processX509Context() throws X509ContextException {
try { try {
final Iterator<Workload.X509SVIDResponse> x509SvidResponse = val x509SvidResponse = workloadApiBlockingStub.fetchX509SVID(newX509SvidRequest());
workloadApiBlockingStub.fetchX509SVID(newX509SvidRequest());
if (x509SvidResponse.hasNext()) { if (x509SvidResponse.hasNext()) {
return GrpcConversionUtils.toX509Context(x509SvidResponse.next()); return GrpcConversionUtils.toX509Context(x509SvidResponse.next());
} }
@ -421,22 +312,18 @@ public class WorkloadApiClient implements Closeable {
throw new X509ContextException("Error processing X509Context: x509SVIDResponse is empty"); throw new X509ContextException("Error processing X509Context: x509SVIDResponse is empty");
} }
private JwtSvid callFetchJwtSvid(SpiffeId subject, Set<String> audience) throws JwtSvidException { private JwtSvid callFetchJwtSvid(final SpiffeId subject, final Set<String> audience) throws JwtSvidException {
final Workload.JWTSVIDRequest jwtsvidRequest = Workload.JWTSVIDRequest val jwtSvidRequest = Workload.JWTSVIDRequest.newBuilder()
.newBuilder()
.setSpiffeId(subject.toString()) .setSpiffeId(subject.toString())
.addAllAudience(audience) .addAllAudience(audience)
.build(); .build();
final Workload.JWTSVIDResponse response = workloadApiBlockingStub.fetchJWTSVID(jwtsvidRequest); val response = workloadApiBlockingStub.fetchJWTSVID(jwtSvidRequest);
return JwtSvid.parseInsecure(response.getSvids(0).getSvid(), audience); return JwtSvid.parseInsecure(response.getSvids(0).getSvid(), audience);
} }
private JwtBundleSet callFetchBundles() throws JwtBundleException { private JwtBundleSet callFetchBundles() throws JwtBundleException {
final Workload.JWTBundlesRequest request = Workload.JWTBundlesRequest val request = Workload.JWTBundlesRequest.newBuilder().build();
.newBuilder() val bundlesResponse = workloadApiBlockingStub.fetchJWTBundles(request);
.build();
final Iterator<Workload.JWTBundlesResponse> bundlesResponse = workloadApiBlockingStub.fetchJWTBundles(request);
if (bundlesResponse.hasNext()) { if (bundlesResponse.hasNext()) {
try { try {
@ -448,6 +335,14 @@ public class WorkloadApiClient implements Closeable {
throw new JwtBundleException("JWT Bundle response from the Workload API is empty"); throw new JwtBundleException("JWT Bundle response from the Workload API is empty");
} }
private Workload.X509SVIDRequest newX509SvidRequest() {
return Workload.X509SVIDRequest.newBuilder().build();
}
private Workload.JWTBundlesRequest newJwtBundlesRequest() {
return Workload.JWTBundlesRequest.newBuilder().build();
}
/** /**
* Options for creating a new {@link WorkloadApiClient}. * Options for creating a new {@link WorkloadApiClient}.
* <p> * <p>
@ -473,7 +368,9 @@ public class WorkloadApiClient implements Closeable {
private ExecutorService executorService; private ExecutorService executorService;
@Builder @Builder
public ClientOptions(String spiffeSocketPath, BackoffPolicy backoffPolicy, ExecutorService executorService) { public ClientOptions(final String spiffeSocketPath,
final BackoffPolicy backoffPolicy,
final ExecutorService executorService) {
this.spiffeSocketPath = spiffeSocketPath; this.spiffeSocketPath = spiffeSocketPath;
this.backoffPolicy = backoffPolicy; this.backoffPolicy = backoffPolicy;
this.executorService = executorService; this.executorService = executorService;

View File

@ -50,7 +50,7 @@ public final class X509Source implements X509SvidSource, BundleSource<X509Bundle
private Function<List<X509Svid>, X509Svid> picker; private Function<List<X509Svid>, X509Svid> picker;
private WorkloadApiClient workloadApiClient; private WorkloadApiClient workloadApiClient;
private volatile boolean closed; private boolean closed;
// private constructor // private constructor
private X509Source() { private X509Source() {
@ -171,7 +171,7 @@ public final class X509Source implements X509SvidSource, BundleSource<X509Bundle
return WorkloadApiClient.newClient(clientOptions); return WorkloadApiClient.newClient(clientOptions);
} }
private void init(Duration timeout) throws TimeoutException { private void init(final Duration timeout) throws TimeoutException {
val done = new CountDownLatch(1); val done = new CountDownLatch(1);
setX509ContextWatcher(done); setX509ContextWatcher(done);
@ -187,17 +187,17 @@ public final class X509Source implements X509SvidSource, BundleSource<X509Bundle
} }
} }
private void setX509ContextWatcher(CountDownLatch done) { private void setX509ContextWatcher(final CountDownLatch done) {
workloadApiClient.watchX509Context(new Watcher<X509Context>() { workloadApiClient.watchX509Context(new Watcher<X509Context>() {
@Override @Override
public void onUpdate(X509Context update) { public void onUpdate(final X509Context update) {
log.log(Level.INFO, "Received X509Context update"); log.log(Level.INFO, "Received X509Context update");
setX509Context(update); setX509Context(update);
done.countDown(); done.countDown();
} }
@Override @Override
public void onError(Throwable error) { public void onError(final Throwable error) {
log.log(Level.SEVERE, String.format("Error in X509Context watcher: %s", log.log(Level.SEVERE, String.format("Error in X509Context watcher: %s",
ExceptionUtils.getStackTrace(error))); ExceptionUtils.getStackTrace(error)));
done.countDown(); done.countDown();
@ -246,10 +246,10 @@ public final class X509Source implements X509SvidSource, BundleSource<X509Bundle
private WorkloadApiClient workloadApiClient; private WorkloadApiClient workloadApiClient;
@Builder @Builder
public X509SourceOptions(String spiffeSocketPath, public X509SourceOptions(final String spiffeSocketPath,
Duration initTimeout, final Duration initTimeout,
Function<List<X509Svid>, X509Svid> svidPicker, final Function<List<X509Svid>, X509Svid> svidPicker,
WorkloadApiClient workloadApiClient) { final WorkloadApiClient workloadApiClient) {
this.spiffeSocketPath = spiffeSocketPath; this.spiffeSocketPath = spiffeSocketPath;
this.initTimeout = initTimeout; this.initTimeout = initTimeout;
this.svidPicker = svidPicker; this.svidPicker = svidPicker;

View File

@ -15,7 +15,7 @@ public class RetryHandler {
private int retryCount; private int retryCount;
public RetryHandler(BackoffPolicy backoffPolicy, ScheduledExecutorService executor) { public RetryHandler(final BackoffPolicy backoffPolicy, final ScheduledExecutorService executor) {
this.nextDelay = backoffPolicy.getInitialDelay(); this.nextDelay = backoffPolicy.getInitialDelay();
this.backoffPolicy = backoffPolicy; this.backoffPolicy = backoffPolicy;
this.executor = executor; this.executor = executor;