diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 507db0228b..ff2d866455 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -28,7 +28,7 @@ public class ConfigUtils { public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; public static final String TIKV_GRPC_IDLE_TIMEOUT = "tikv.grpc.idle_timeout"; - + public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size"; public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency"; public static final String TIKV_TABLE_SCAN_CONCURRENCY = "tikv.table.scan_concurrency"; diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 52960ce685..0cc4b3329d 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -42,6 +42,8 @@ import org.tikv.common.util.*; import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; import org.tikv.raw.SmartRawKVClient; +import org.tikv.service.failsafe.CircuitBreaker; +import org.tikv.service.failsafe.CircuitBreakerImpl; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -70,6 +72,7 @@ public class TiSession implements AutoCloseable { private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private volatile boolean isClosed = false; private MetricsServer metricsServer; + private final CircuitBreaker circuitBreaker; public TiSession(TiConfiguration conf) { // may throw org.tikv.common.MetricsServer - http server not up @@ -84,6 +87,7 @@ public class TiSession implements AutoCloseable { logger.info("enable grpc forward for high available"); } warmUp(); + this.circuitBreaker = new CircuitBreakerImpl(conf); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } @@ -150,7 +154,7 @@ public class TiSession implements AutoCloseable { public SmartRawKVClient createSmartRawClient() { RawKVClient rawKVClient = createRawClient(); - return new SmartRawKVClient(rawKVClient, getConf()); + return new SmartRawKVClient(rawKVClient, circuitBreaker); } public KVClient createKVClient() { diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index a9c4f4aa4c..02e75625fc 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -22,12 +22,10 @@ import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration; import org.tikv.common.exception.CircuitBreakerOpenException; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; import org.tikv.service.failsafe.CircuitBreaker; -import org.tikv.service.failsafe.CircuitBreakerImpl; public class SmartRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); @@ -63,9 +61,9 @@ public class SmartRawKVClient implements RawKVClientBase { private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; - public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { + public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) { this.client = client; - this.circuitBreaker = new CircuitBreakerImpl(conf); + this.circuitBreaker = breaker; } @Override diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java index b4c0bc5588..6249f84e95 100644 --- a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -72,6 +72,13 @@ public class SmartRawKVClientTest { } } + @Test + public void testMultiClients() { + for (int i = 0; i < 10240; i++) { + client = session.createSmartRawClient(); + } + } + private void success() { client.get(ByteString.copyFromUtf8("key")); }