TiSession support graceful close (#238)

Signed-off-by: marsishandsome <marsishandsome@gmail.com>
This commit is contained in:
Liangliang Gu 2021-07-28 14:15:57 +08:00 committed by GitHub
parent 7fe59efd4a
commit 257f3062c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 262 additions and 33 deletions

View File

@ -70,7 +70,7 @@ public class TiSession implements AutoCloseable {
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
private boolean isClosed = false;
private volatile boolean isClosed = false;
private MetricsServer metricsServer;
private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;
@ -106,22 +106,30 @@ public class TiSession implements AutoCloseable {
}
public RawKVClient createRawClient() {
checkIsClosed();
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
return new RawKVClient(this, builder);
}
public KVClient createKVClient() {
checkIsClosed();
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
return new KVClient(conf, builder);
}
public TxnKVClient createTxnClient() {
checkIsClosed();
return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
}
public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() {
checkIsClosed();
RegionStoreClient.RegionStoreClientBuilder res = clientBuilder;
if (res == null) {
synchronized (this) {
@ -137,6 +145,8 @@ public class TiSession implements AutoCloseable {
}
public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
checkIsClosed();
ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
if (res == null) {
synchronized (this) {
@ -156,18 +166,26 @@ public class TiSession implements AutoCloseable {
}
public TiTimestamp getTimestamp() {
checkIsClosed();
return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
}
public Snapshot createSnapshot() {
checkIsClosed();
return new Snapshot(getTimestamp(), this);
}
public Snapshot createSnapshot(TiTimestamp ts) {
checkIsClosed();
return new Snapshot(ts, this);
}
public PDClient getPDClient() {
checkIsClosed();
PDClient res = client;
if (res == null) {
synchronized (this) {
@ -181,6 +199,8 @@ public class TiSession implements AutoCloseable {
}
public Catalog getCatalog() {
checkIsClosed();
Catalog res = catalog;
if (res == null) {
synchronized (this) {
@ -194,6 +214,8 @@ public class TiSession implements AutoCloseable {
}
public RegionManager getRegionManager() {
checkIsClosed();
RegionManager res = regionManager;
if (res == null) {
synchronized (this) {
@ -207,6 +229,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForIndexScan() {
checkIsClosed();
ExecutorService res = indexScanThreadPool;
if (res == null) {
synchronized (this) {
@ -226,6 +250,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForTableScan() {
checkIsClosed();
ExecutorService res = tableScanThreadPool;
if (res == null) {
synchronized (this) {
@ -242,6 +268,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForBatchPut() {
checkIsClosed();
ExecutorService res = batchPutThreadPool;
if (res == null) {
synchronized (this) {
@ -261,6 +289,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForBatchGet() {
checkIsClosed();
ExecutorService res = batchGetThreadPool;
if (res == null) {
synchronized (this) {
@ -280,6 +310,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForBatchDelete() {
checkIsClosed();
ExecutorService res = batchDeleteThreadPool;
if (res == null) {
synchronized (this) {
@ -299,6 +331,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForBatchScan() {
checkIsClosed();
ExecutorService res = batchScanThreadPool;
if (res == null) {
synchronized (this) {
@ -318,6 +352,8 @@ public class TiSession implements AutoCloseable {
}
public ExecutorService getThreadPoolForDeleteRange() {
checkIsClosed();
ExecutorService res = deleteRangeThreadPool;
if (res == null) {
synchronized (this) {
@ -338,6 +374,8 @@ public class TiSession implements AutoCloseable {
@VisibleForTesting
public ChannelFactory getChannelFactory() {
checkIsClosed();
return channelFactory;
}
@ -347,6 +385,8 @@ public class TiSession implements AutoCloseable {
* @return a SwitchTiKVModeClient
*/
public SwitchTiKVModeClient getSwitchTiKVModeClient() {
checkIsClosed();
return new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
}
@ -363,6 +403,8 @@ public class TiSession implements AutoCloseable {
int splitRegionBackoffMS,
int scatterRegionBackoffMS,
int scatterWaitMS) {
checkIsClosed();
logger.info(String.format("split key's size is %d", splitKeys.size()));
long startMS = System.currentTimeMillis();
@ -412,6 +454,8 @@ public class TiSession implements AutoCloseable {
* @param splitKeys
*/
public void splitRegionAndScatter(List<byte[]> splitKeys) {
checkIsClosed();
int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
@ -475,50 +519,111 @@ public class TiSession implements AutoCloseable {
return regions;
}
private void checkIsClosed() {
if (isClosed) {
throw new RuntimeException("this TiSession is closed!");
}
}
public synchronized void closeAwaitTermination(long timeoutMS) throws Exception {
shutdown(false);
long startMS = System.currentTimeMillis();
while (true) {
if (isTerminatedExecutorServices()) {
cleanAfterTerminated();
return;
}
if (System.currentTimeMillis() - startMS > timeoutMS) {
shutdown(true);
return;
}
Thread.sleep(500);
}
}
@Override
public synchronized void close() throws Exception {
if (isClosed) {
logger.warn("this TiSession is already closed!");
return;
shutdown(true);
}
private synchronized void shutdown(boolean now) throws Exception {
if (!isClosed) {
isClosed = true;
synchronized (sessionCachedMap) {
sessionCachedMap.remove(conf.getPdAddrsString());
}
if (metricsServer != null) {
metricsServer.close();
}
}
if (metricsServer != null) {
metricsServer.close();
if (now) {
shutdownNowExecutorServices();
cleanAfterTerminated();
} else {
shutdownExecutorServices();
}
}
isClosed = true;
synchronized (sessionCachedMap) {
sessionCachedMap.remove(conf.getPdAddrsString());
}
private synchronized void cleanAfterTerminated() throws InterruptedException {
if (regionManager != null) {
regionManager.close();
}
if (tableScanThreadPool != null) {
tableScanThreadPool.shutdownNow();
}
if (indexScanThreadPool != null) {
indexScanThreadPool.shutdownNow();
}
if (batchGetThreadPool != null) {
batchGetThreadPool.shutdownNow();
}
if (batchPutThreadPool != null) {
batchPutThreadPool.shutdownNow();
}
if (batchDeleteThreadPool != null) {
batchDeleteThreadPool.shutdownNow();
}
if (batchScanThreadPool != null) {
batchScanThreadPool.shutdownNow();
}
if (deleteRangeThreadPool != null) {
deleteRangeThreadPool.shutdownNow();
}
if (client != null) {
getPDClient().close();
client.close();
}
if (catalog != null) {
getCatalog().close();
catalog.close();
}
}
private List<ExecutorService> getExecutorServices() {
List<ExecutorService> executorServiceList = new ArrayList<>();
if (tableScanThreadPool != null) {
executorServiceList.add(tableScanThreadPool);
}
if (indexScanThreadPool != null) {
executorServiceList.add(indexScanThreadPool);
}
if (batchGetThreadPool != null) {
executorServiceList.add(batchGetThreadPool);
}
if (batchPutThreadPool != null) {
executorServiceList.add(batchPutThreadPool);
}
if (batchDeleteThreadPool != null) {
executorServiceList.add(batchDeleteThreadPool);
}
if (batchScanThreadPool != null) {
executorServiceList.add(batchScanThreadPool);
}
if (deleteRangeThreadPool != null) {
executorServiceList.add(deleteRangeThreadPool);
}
return executorServiceList;
}
private void shutdownExecutorServices() {
for (ExecutorService executorService : getExecutorServices()) {
executorService.shutdown();
}
}
private void shutdownNowExecutorServices() {
for (ExecutorService executorService : getExecutorServices()) {
executorService.shutdownNow();
}
}
private boolean isTerminatedExecutorServices() {
for (ExecutorService executorService : getExecutorServices()) {
if (!executorService.isTerminated()) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,124 @@
package org.tikv.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
import org.tikv.common.region.TiRegion;
import org.tikv.raw.RawKVClient;
public class TiSessionTest {
private TiSession session;
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
@Test
public void closeWithRunningTaskTest() throws Exception {
doCloseWithRunningTaskTest(true, 0);
}
@Test
public void closeAwaitTerminationWithRunningTaskTest() throws Exception {
doCloseWithRunningTaskTest(false, 10000);
}
private void doCloseWithRunningTaskTest(boolean now, long timeoutMS) throws Exception {
TiConfiguration conf = TiConfiguration.createRawDefault();
session = TiSession.create(conf);
ExecutorService executorService = session.getThreadPoolForBatchGet();
AtomicReference<InterruptedException> interruptedException = new AtomicReference<>();
executorService.submit(
() -> {
int i = 1;
while (true) {
i = i + 1;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
interruptedException.set(e);
break;
}
}
});
Thread.sleep(2000);
long startMS = System.currentTimeMillis();
if (now) {
session.close();
Thread.sleep(1000);
assertNotNull(interruptedException.get());
assertTrue(System.currentTimeMillis() - startMS < 2000);
} else {
session.closeAwaitTermination(timeoutMS);
assertNotNull(interruptedException.get());
assertTrue(System.currentTimeMillis() - startMS >= timeoutMS);
}
}
@Test
public void closeTest() throws Exception {
doCloseTest(true, 0);
}
@Test
public void closeAwaitTerminationTest() throws Exception {
doCloseTest(false, 10000);
}
private void doCloseTest(boolean now, long timeoutMS) throws Exception {
TiConfiguration conf = TiConfiguration.createRawDefault();
session = TiSession.create(conf);
RawKVClient client = session.createRawClient();
// test getRegionByKey
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");
TiRegion region = session.getRegionManager().getRegionByKey(key);
assertNotNull(region);
// test RawKVClient
client.put(key, value);
List<ByteString> keys = new ArrayList<>();
keys.add(key);
client.batchGet(keys);
// close TiSession
if (now) {
session.close();
} else {
session.closeAwaitTermination(timeoutMS);
}
// test getRegionByKey
try {
session.getRegionManager().getRegionByKey(key);
fail();
} catch (RuntimeException e) {
assertEquals("this TiSession is closed!", e.getMessage());
}
// test RawKVClient
try {
client.batchGet(keys);
fail();
} catch (RejectedExecutionException e) {
assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor"));
}
}
}