Solve memory leak. (#112)

Reverting changes that introduced a memory leak. Addressing error when context was cancelled

Signed-off-by: Max Lambrecht <max.lambrecht@hpe.com>
This commit is contained in:
Max Lambrecht 2023-03-07 13:30:51 -06:00 committed by GitHub
parent 942bcc9eb4
commit f9dc354ae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 11 deletions

View File

@ -334,12 +334,13 @@ public final class DefaultWorkloadApiClient implements WorkloadApiClient {
for (val context : cancellableContexts) {
context.close();
}
retryExecutor.shutdown();
executorService.shutdown();
if (managedChannel != null) {
managedChannel.close();
}
retryExecutor.shutdown();
executorService.shutdown();
closed = true;
}
}

View File

@ -45,7 +45,9 @@ final class StreamObservers {
@Override
public void onError(final Throwable t) {
log.log(Level.SEVERE, "X.509 context observer error", t);
if (Status.fromThrowable(t).getCode() != Status.Code.CANCELLED) {
log.log(Level.SEVERE, "X.509 context observer error", t);
}
handleWatchX509ContextError(t);
}
@ -59,7 +61,7 @@ final class StreamObservers {
private void handleX509ContextRetry(Throwable t) {
if (retryHandler.shouldRetry()) {
log.log(Level.INFO, "Retrying connecting to Workload API to register X.509 context watcher");
log.log(Level.FINE, "Retrying connecting to Workload API to register X.509 context watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(
() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(),
@ -97,7 +99,9 @@ final class StreamObservers {
@Override
public void onError(final Throwable t) {
log.log(Level.SEVERE, "X.509 bundles observer error", t);
if (Status.fromThrowable(t).getCode() != Status.Code.CANCELLED) {
log.log(Level.SEVERE, "X.509 bundles observer error", t);
}
handleWatchX509BundlesError(t);
}
@ -111,7 +115,7 @@ final class StreamObservers {
private void handleX509BundlesRetry(Throwable t) {
if (retryHandler.shouldRetry()) {
log.log(Level.INFO, "Retrying connecting to Workload API to register X.509 bundles watcher");
log.log(Level.FINE, "Retrying connecting to Workload API to register X.509 bundles watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(
() -> workloadApiAsyncStub.fetchX509Bundles(newX509BundlesRequest(),
@ -149,7 +153,9 @@ final class StreamObservers {
@Override
public void onError(final Throwable t) {
log.log(Level.SEVERE, "JWT observer error", t);
if (Status.fromThrowable(t).getCode() != Status.Code.CANCELLED) {
log.log(Level.SEVERE, "JWT observer error", t);
}
handleWatchJwtBundleError(t);
}
@ -163,7 +169,7 @@ final class StreamObservers {
private void handleJwtBundleRetry(Throwable t) {
if (retryHandler.shouldRetry()) {
log.log(Level.INFO, "Retrying connecting to Workload API to register JWT Bundles watcher");
log.log(Level.FINE, "Retrying connecting to Workload API to register JWT Bundles watcher");
retryHandler.scheduleRetry(() ->
cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(),
this)));

View File

@ -28,6 +28,10 @@ public class RetryHandler {
* @param runnable the task to be scheduled for execution
*/
public void scheduleRetry(final Runnable runnable) {
if (executor.isShutdown()) {
return;
}
if (exponentialBackoffPolicy.reachedMaxRetries(retryCount)) {
return;
}

View File

@ -11,8 +11,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.*;
class RetryHandlerTest {
@ -78,7 +77,8 @@ class RetryHandlerTest {
// fourth retry exceeds max retries
retryHandler.scheduleRetry(runnable);
verifyNoInteractions(scheduledExecutorService);
verify(scheduledExecutorService).isShutdown();
verifyNoMoreInteractions(scheduledExecutorService);
}
@Test