All keepTiKVToImportMode after stopped (#283)

This commit is contained in:
Peng Guanwen 2021-09-30 10:30:14 +08:00 committed by GitHub
parent d543ae73c0
commit 869124880e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 17 deletions

View File

@ -36,32 +36,36 @@ public class SwitchTiKVModeClient {
private final PDClient pdClient;
private final ImporterStoreClient.ImporterStoreClientBuilder builder;
private final ScheduledExecutorService ingestScheduledExecutorService;
private ScheduledExecutorService ingestScheduledExecutorService;
public SwitchTiKVModeClient(
PDClient pdClient, ImporterStoreClient.ImporterStoreClientBuilder builder) {
this.pdClient = pdClient;
this.builder = builder;
this.ingestScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("switch-tikv-mode-pool-%d")
.setDaemon(true)
.build());
}
public void switchTiKVToNormalMode() {
doSwitchTiKVMode(ImportSstpb.SwitchMode.Normal);
}
public void keepTiKVToImportMode() {
ingestScheduledExecutorService.scheduleAtFixedRate(
this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS);
public synchronized void keepTiKVToImportMode() {
if (ingestScheduledExecutorService == null) {
ingestScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("switch-tikv-mode-pool-%d")
.setDaemon(true)
.build());
ingestScheduledExecutorService.scheduleAtFixedRate(
this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS);
}
}
public void stopKeepTiKVToImportMode() {
ingestScheduledExecutorService.shutdown();
public synchronized void stopKeepTiKVToImportMode() {
if (ingestScheduledExecutorService != null) {
ingestScheduledExecutorService.shutdown();
ingestScheduledExecutorService = null;
}
}
private void switchTiKVToImportMode() {

View File

@ -26,9 +26,11 @@ public class SwitchTiKVModeTest {
@Test
public void switchTiKVModeTest() throws InterruptedException {
SwitchTiKVModeClient switchTiKVModeClient = session.getSwitchTiKVModeClient();
switchTiKVModeClient.keepTiKVToImportMode();
Thread.sleep(6000);
switchTiKVModeClient.stopKeepTiKVToImportMode();
switchTiKVModeClient.switchTiKVToNormalMode();
for (int i = 0; i < 2; i++) {
switchTiKVModeClient.keepTiKVToImportMode();
Thread.sleep(6000);
switchTiKVModeClient.stopKeepTiKVToImportMode();
switchTiKVModeClient.switchTiKVToNormalMode();
}
}
}