[close #619] Shutdown recycler when closing ChannelFactory to avoid resource leak (#618)

Co-authored-by: iosmanthus <dengliming@pingcap.com>
This commit is contained in:
Daemonxiao 2022-06-22 13:19:59 +08:00 committed by GitHub
parent d6a15c4ccc
commit 3724e87df6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 16 deletions

View File

@ -64,7 +64,7 @@ public class ChannelFactory implements AutoCloseable {
private final AtomicReference<SslContextBuilder> sslContextBuilder = new AtomicReference<>();
private final ScheduledExecutorService recycler = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService recycler;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@ -209,6 +209,7 @@ public class ChannelFactory implements AutoCloseable {
this.idleTimeout = idleTimeout;
this.certWatcher = null;
this.certContext = null;
this.recycler = null;
this.connRecycleTime = 0;
}
@ -229,6 +230,7 @@ public class ChannelFactory implements AutoCloseable {
this.connRecycleTime = connRecycleTime;
this.certContext =
new OpenSslContext(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath);
this.recycler = Executors.newSingleThreadScheduledExecutor();
File trustCert = new File(trustCertCollectionFilePath);
File keyCert = new File(keyCertChainFilePath);
@ -261,6 +263,7 @@ public class ChannelFactory implements AutoCloseable {
this.idleTimeout = idleTimeout;
this.connRecycleTime = connRecycleTime;
this.certContext = new JksContext(jksKeyPath, jksKeyPassword, jksTrustPath, jksTrustPassword);
this.recycler = Executors.newSingleThreadScheduledExecutor();
File jksKey = new File(jksKeyPath);
File jksTrust = new File(jksTrustPath);
@ -360,11 +363,12 @@ public class ChannelFactory implements AutoCloseable {
}
connPool.clear();
if (certContext != null) {
if (recycler != null) {
recycler.shutdown();
if (certWatcher != null) {
certWatcher.close();
}
}
if (certWatcher != null) {
certWatcher.close();
}
}
}

View File

@ -55,24 +55,28 @@ public class ChannelFactoryTest {
File a = new File(caPath);
File b = new File(clientCertPath);
File c = new File(clientKeyPath);
new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true));
Thread.sleep(5000);
assertTrue(changed.get());
try (CertWatcher certWatcher =
new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true))) {
Thread.sleep(5000);
assertTrue(changed.get());
}
}
@Test
public void testCertWatcherWithExceptionTask() throws InterruptedException {
AtomicInteger timesOfReloadTask = new AtomicInteger(0);
new CertWatcher(
1,
ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)),
() -> {
timesOfReloadTask.getAndIncrement();
touchCert();
throw new RuntimeException("Mock exception in reload task");
});
CertWatcher certWatcher =
new CertWatcher(
1,
ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)),
() -> {
timesOfReloadTask.getAndIncrement();
touchCert();
throw new RuntimeException("Mock exception in reload task");
});
Thread.sleep(5000);
certWatcher.close();
assertTrue(timesOfReloadTask.get() > 1);
}