mirror of https://github.com/tikv/client-java.git
				
				
				
			Signed-off-by: marsishandsome <marsishandsome@gmail.com>
This commit is contained in:
		
							parent
							
								
									b8c6b4b740
								
							
						
					
					
						commit
						7ce6c4d1bc
					
				| 
						 | 
				
			
			@ -67,7 +67,7 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  private volatile RegionManager regionManager;
 | 
			
		||||
  private volatile boolean enableGrpcForward;
 | 
			
		||||
  private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
 | 
			
		||||
  private boolean isClosed = false;
 | 
			
		||||
  private volatile boolean isClosed = false;
 | 
			
		||||
  private MetricsServer metricsServer;
 | 
			
		||||
 | 
			
		||||
  public TiSession(TiConfiguration conf) {
 | 
			
		||||
| 
						 | 
				
			
			@ -102,22 +102,30 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public RawKVClient createRawClient() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    RegionStoreClientBuilder builder =
 | 
			
		||||
        new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
 | 
			
		||||
    return new RawKVClient(this, builder);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public KVClient createKVClient() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    RegionStoreClientBuilder builder =
 | 
			
		||||
        new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
 | 
			
		||||
    return new KVClient(conf, builder);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public TxnKVClient createTxnClient() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    RegionStoreClient.RegionStoreClientBuilder res = clientBuilder;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -137,18 +145,26 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public TiTimestamp getTimestamp() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public Snapshot createSnapshot() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    return new Snapshot(getTimestamp(), this);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public Snapshot createSnapshot(TiTimestamp ts) {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    return new Snapshot(ts, this);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public PDClient getPDClient() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    PDClient res = client;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -162,6 +178,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public Catalog getCatalog() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    Catalog res = catalog;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -175,6 +193,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public RegionManager getRegionManager() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    RegionManager res = regionManager;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -188,6 +208,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForIndexScan() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = indexScanThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -207,6 +229,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForTableScan() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = tableScanThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -223,6 +247,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForBatchPut() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = batchPutThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -242,6 +268,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForBatchGet() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = batchGetThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -261,6 +289,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForBatchDelete() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = batchDeleteThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -280,6 +310,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForBatchScan() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = batchScanThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -299,6 +331,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  public ExecutorService getThreadPoolForDeleteRange() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    ExecutorService res = deleteRangeThreadPool;
 | 
			
		||||
    if (res == null) {
 | 
			
		||||
      synchronized (this) {
 | 
			
		||||
| 
						 | 
				
			
			@ -319,6 +353,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
 | 
			
		||||
  @VisibleForTesting
 | 
			
		||||
  public ChannelFactory getChannelFactory() {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    return channelFactory;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -332,6 +368,8 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
      int splitRegionBackoffMS,
 | 
			
		||||
      int scatterRegionBackoffMS,
 | 
			
		||||
      int scatterWaitMS) {
 | 
			
		||||
    checkIsClosed();
 | 
			
		||||
 | 
			
		||||
    logger.info(String.format("split key's size is %d", splitKeys.size()));
 | 
			
		||||
    long startMS = System.currentTimeMillis();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -374,7 +412,7 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
    long endMS = System.currentTimeMillis();
 | 
			
		||||
    logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
  private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
 | 
			
		||||
    List<Metapb.Region> regions = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -417,50 +455,111 @@ public class TiSession implements AutoCloseable {
 | 
			
		|||
    return regions;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void checkIsClosed() {
 | 
			
		||||
    if (isClosed) {
 | 
			
		||||
      throw new RuntimeException("this TiSession is closed!");
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public synchronized void closeAwaitTermination(long timeoutMS) throws Exception {
 | 
			
		||||
    shutdown(false);
 | 
			
		||||
 | 
			
		||||
    long startMS = System.currentTimeMillis();
 | 
			
		||||
    while (true) {
 | 
			
		||||
      if (isTerminatedExecutorServices()) {
 | 
			
		||||
        cleanAfterTerminated();
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (System.currentTimeMillis() - startMS > timeoutMS) {
 | 
			
		||||
        shutdown(true);
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      Thread.sleep(500);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  public synchronized void close() throws Exception {
 | 
			
		||||
    if (isClosed) {
 | 
			
		||||
      logger.warn("this TiSession is already closed!");
 | 
			
		||||
      return;
 | 
			
		||||
    shutdown(true);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private synchronized void shutdown(boolean now) throws Exception {
 | 
			
		||||
    if (!isClosed) {
 | 
			
		||||
      isClosed = true;
 | 
			
		||||
      synchronized (sessionCachedMap) {
 | 
			
		||||
        sessionCachedMap.remove(conf.getPdAddrsString());
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (metricsServer != null) {
 | 
			
		||||
        metricsServer.close();
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (metricsServer != null) {
 | 
			
		||||
      metricsServer.close();
 | 
			
		||||
    if (now) {
 | 
			
		||||
      shutdownNowExecutorServices();
 | 
			
		||||
      cleanAfterTerminated();
 | 
			
		||||
    } else {
 | 
			
		||||
      shutdownExecutorServices();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
    isClosed = true;
 | 
			
		||||
    synchronized (sessionCachedMap) {
 | 
			
		||||
      sessionCachedMap.remove(conf.getPdAddrsString());
 | 
			
		||||
    }
 | 
			
		||||
  private synchronized void cleanAfterTerminated() throws InterruptedException {
 | 
			
		||||
    if (regionManager != null) {
 | 
			
		||||
      regionManager.close();
 | 
			
		||||
    }
 | 
			
		||||
    if (tableScanThreadPool != null) {
 | 
			
		||||
      tableScanThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (indexScanThreadPool != null) {
 | 
			
		||||
      indexScanThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (batchGetThreadPool != null) {
 | 
			
		||||
      batchGetThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (batchPutThreadPool != null) {
 | 
			
		||||
      batchPutThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (batchDeleteThreadPool != null) {
 | 
			
		||||
      batchDeleteThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (batchScanThreadPool != null) {
 | 
			
		||||
      batchScanThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (deleteRangeThreadPool != null) {
 | 
			
		||||
      deleteRangeThreadPool.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
    if (client != null) {
 | 
			
		||||
      getPDClient().close();
 | 
			
		||||
      client.close();
 | 
			
		||||
    }
 | 
			
		||||
    if (catalog != null) {
 | 
			
		||||
      getCatalog().close();
 | 
			
		||||
      catalog.close();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private List<ExecutorService> getExecutorServices() {
 | 
			
		||||
    List<ExecutorService> executorServiceList = new ArrayList<>();
 | 
			
		||||
    if (tableScanThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(tableScanThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    if (indexScanThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(indexScanThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    if (batchGetThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(batchGetThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    if (batchPutThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(batchPutThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    if (batchDeleteThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(batchDeleteThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    if (batchScanThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(batchScanThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    if (deleteRangeThreadPool != null) {
 | 
			
		||||
      executorServiceList.add(deleteRangeThreadPool);
 | 
			
		||||
    }
 | 
			
		||||
    return executorServiceList;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void shutdownExecutorServices() {
 | 
			
		||||
    for (ExecutorService executorService : getExecutorServices()) {
 | 
			
		||||
      executorService.shutdown();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void shutdownNowExecutorServices() {
 | 
			
		||||
    for (ExecutorService executorService : getExecutorServices()) {
 | 
			
		||||
      executorService.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private boolean isTerminatedExecutorServices() {
 | 
			
		||||
    for (ExecutorService executorService : getExecutorServices()) {
 | 
			
		||||
      if (!executorService.isTerminated()) {
 | 
			
		||||
        return false;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,124 @@
 | 
			
		|||
package org.tikv.common;
 | 
			
		||||
 | 
			
		||||
import static org.junit.Assert.assertEquals;
 | 
			
		||||
import static org.junit.Assert.assertNotNull;
 | 
			
		||||
import static org.junit.Assert.assertTrue;
 | 
			
		||||
import static org.junit.Assert.fail;
 | 
			
		||||
 | 
			
		||||
import com.google.protobuf.ByteString;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.RejectedExecutionException;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicReference;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.tikv.common.region.TiRegion;
 | 
			
		||||
import org.tikv.raw.RawKVClient;
 | 
			
		||||
 | 
			
		||||
public class TiSessionTest {
 | 
			
		||||
  private TiSession session;
 | 
			
		||||
 | 
			
		||||
  @After
 | 
			
		||||
  public void tearDown() throws Exception {
 | 
			
		||||
    if (session != null) {
 | 
			
		||||
      session.close();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void closeWithRunningTaskTest() throws Exception {
 | 
			
		||||
    doCloseWithRunningTaskTest(true, 0);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void closeAwaitTerminationWithRunningTaskTest() throws Exception {
 | 
			
		||||
    doCloseWithRunningTaskTest(false, 10000);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void doCloseWithRunningTaskTest(boolean now, long timeoutMS) throws Exception {
 | 
			
		||||
    TiConfiguration conf = TiConfiguration.createRawDefault();
 | 
			
		||||
    session = TiSession.create(conf);
 | 
			
		||||
 | 
			
		||||
    ExecutorService executorService = session.getThreadPoolForBatchGet();
 | 
			
		||||
    AtomicReference<InterruptedException> interruptedException = new AtomicReference<>();
 | 
			
		||||
    executorService.submit(
 | 
			
		||||
        () -> {
 | 
			
		||||
          int i = 1;
 | 
			
		||||
          while (true) {
 | 
			
		||||
            i = i + 1;
 | 
			
		||||
            try {
 | 
			
		||||
              Thread.sleep(1000);
 | 
			
		||||
            } catch (InterruptedException e) {
 | 
			
		||||
              interruptedException.set(e);
 | 
			
		||||
              break;
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    Thread.sleep(2000);
 | 
			
		||||
 | 
			
		||||
    long startMS = System.currentTimeMillis();
 | 
			
		||||
    if (now) {
 | 
			
		||||
      session.close();
 | 
			
		||||
      Thread.sleep(1000);
 | 
			
		||||
      assertNotNull(interruptedException.get());
 | 
			
		||||
      assertTrue(System.currentTimeMillis() - startMS < 2000);
 | 
			
		||||
    } else {
 | 
			
		||||
      session.closeAwaitTermination(timeoutMS);
 | 
			
		||||
      assertNotNull(interruptedException.get());
 | 
			
		||||
      assertTrue(System.currentTimeMillis() - startMS >= timeoutMS);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void closeTest() throws Exception {
 | 
			
		||||
    doCloseTest(true, 0);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void closeAwaitTerminationTest() throws Exception {
 | 
			
		||||
    doCloseTest(false, 10000);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void doCloseTest(boolean now, long timeoutMS) throws Exception {
 | 
			
		||||
    TiConfiguration conf = TiConfiguration.createRawDefault();
 | 
			
		||||
    session = TiSession.create(conf);
 | 
			
		||||
    RawKVClient client = session.createRawClient();
 | 
			
		||||
 | 
			
		||||
    // test getRegionByKey
 | 
			
		||||
    ByteString key = ByteString.copyFromUtf8("key");
 | 
			
		||||
    ByteString value = ByteString.copyFromUtf8("value");
 | 
			
		||||
    TiRegion region = session.getRegionManager().getRegionByKey(key);
 | 
			
		||||
    assertNotNull(region);
 | 
			
		||||
 | 
			
		||||
    // test RawKVClient
 | 
			
		||||
    client.put(key, value);
 | 
			
		||||
    List<ByteString> keys = new ArrayList<>();
 | 
			
		||||
    keys.add(key);
 | 
			
		||||
    client.batchGet(keys);
 | 
			
		||||
 | 
			
		||||
    // close TiSession
 | 
			
		||||
    if (now) {
 | 
			
		||||
      session.close();
 | 
			
		||||
    } else {
 | 
			
		||||
      session.closeAwaitTermination(timeoutMS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // test getRegionByKey
 | 
			
		||||
    try {
 | 
			
		||||
      session.getRegionManager().getRegionByKey(key);
 | 
			
		||||
      fail();
 | 
			
		||||
    } catch (RuntimeException e) {
 | 
			
		||||
      assertEquals("this TiSession is closed!", e.getMessage());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // test RawKVClient
 | 
			
		||||
    try {
 | 
			
		||||
      client.batchGet(keys);
 | 
			
		||||
      fail();
 | 
			
		||||
    } catch (RejectedExecutionException e) {
 | 
			
		||||
      assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor"));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue