cherry pick #358 to release-3.1 (#373)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: marsishandsome <marsishandsome@gmail.com>

Co-authored-by: Liangliang Gu <marsishandsome@gmail.com>
This commit is contained in:
ti-srebot 2021-12-10 02:26:12 +08:00 committed by GitHub
parent a6e991ae98
commit 450965a515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1441 additions and 180 deletions

View File

@ -75,6 +75,19 @@ public class ConfigUtils {
"tikv.rawkv.batch_write_slowlog_in_ms";
public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms";
public static final String TiKV_CIRCUIT_BREAK_ENABLE = "tikv.circuit_break.enable";
public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS =
"tikv.circuit_break.trigger.availability.window_in_seconds";
public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE =
"tikv.circuit_break.trigger.availability.error_threshold_percentage";
public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD =
"tikv.circuit_break.trigger.availability.request_volumn_threshold";
public static final String TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS =
"tikv.circuit_break.trigger.sleep_window_in_seconds";
public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT =
"tikv.circuit_break.trigger.attempt_request_count";
public static final String TIFLASH_ENABLE = "tiflash.enable";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
@ -132,4 +145,11 @@ public class ConfigUtils {
public static final String LEADER = "LEADER";
public static final String FOLLOWER = "FOLLOWER";
public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER";
public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10;
public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20;
public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10;
}

View File

@ -120,6 +120,20 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
setIfMissing(TiKV_CIRCUIT_BREAK_ENABLE, DEF_TiKV_CIRCUIT_BREAK_ENABLE);
setIfMissing(
TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS,
DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS);
setIfMissing(
TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE,
DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE);
setIfMissing(
TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD,
DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD);
setIfMissing(
TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
setIfMissing(
TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
}
public static void listAll() {
@ -328,6 +342,16 @@ public class TiConfiguration implements Serializable {
getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS);
private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE);
private int circuitBreakAvailabilityWindowInSeconds =
getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS);
private int circuitBreakAvailabilityErrorThresholdPercentage =
getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE);
private int circuitBreakAvailabilityRequestVolumnThreshold =
getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD);
private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
public enum KVMode {
TXN,
RAW
@ -729,4 +753,57 @@ public class TiConfiguration implements Serializable {
public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) {
this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS;
}
public boolean isCircuitBreakEnable() {
return circuitBreakEnable;
}
public void setCircuitBreakEnable(boolean circuitBreakEnable) {
this.circuitBreakEnable = circuitBreakEnable;
}
public int getCircuitBreakAvailabilityWindowInSeconds() {
return circuitBreakAvailabilityWindowInSeconds;
}
public void setCircuitBreakAvailabilityWindowInSeconds(
int circuitBreakAvailabilityWindowInSeconds) {
this.circuitBreakAvailabilityWindowInSeconds = circuitBreakAvailabilityWindowInSeconds;
}
public int getCircuitBreakAvailabilityErrorThresholdPercentage() {
return circuitBreakAvailabilityErrorThresholdPercentage;
}
public void setCircuitBreakAvailabilityErrorThresholdPercentage(
int circuitBreakAvailabilityErrorThresholdPercentage) {
this.circuitBreakAvailabilityErrorThresholdPercentage =
circuitBreakAvailabilityErrorThresholdPercentage;
}
public int getCircuitBreakAvailabilityRequestVolumnThreshold() {
return circuitBreakAvailabilityRequestVolumnThreshold;
}
public void setCircuitBreakAvailabilityRequestVolumnThreshold(
int circuitBreakAvailabilityRequestVolumnThreshold) {
this.circuitBreakAvailabilityRequestVolumnThreshold =
circuitBreakAvailabilityRequestVolumnThreshold;
}
public int getCircuitBreakSleepWindowInSeconds() {
return circuitBreakSleepWindowInSeconds;
}
public void setCircuitBreakSleepWindowInSeconds(int circuitBreakSleepWindowInSeconds) {
this.circuitBreakSleepWindowInSeconds = circuitBreakSleepWindowInSeconds;
}
public int getCircuitBreakAttemptRequestCount() {
return circuitBreakAttemptRequestCount;
}
public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) {
this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount;
}
}

View File

@ -41,6 +41,7 @@ import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.raw.SmartRawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
@ -112,6 +113,11 @@ public class TiSession implements AutoCloseable {
return new RawKVClient(this, builder);
}
public SmartRawKVClient createSmartRawClient() {
RawKVClient rawKVClient = createRawClient();
return new SmartRawKVClient(rawKVClient, getConf());
}
public KVClient createKVClient() {
checkIsClosed();

View File

@ -0,0 +1,22 @@
/*
* Copyright 2018 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.exception;
public class CircuitBreakerOpenException extends RuntimeException {
public CircuitBreakerOpenException() {
super("Circuit Breaker Opened");
}
}

View File

@ -40,7 +40,7 @@ import org.tikv.common.region.TiRegion;
import org.tikv.common.util.*;
import org.tikv.kvproto.Kvrpcpb.KvPair;
public class RawKVClient implements AutoCloseable {
public class RawKVClient implements RawKVClientBase {
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorService batchGetThreadPool;
@ -50,15 +50,6 @@ public class RawKVClient implements AutoCloseable {
private final ExecutorService deleteRangeThreadPool;
private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
// https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
private static final int MAX_RAW_SCAN_LIMIT = 10240;
private static final int MAX_RAW_BATCH_LIMIT = 1024;
private static final int RAW_BATCH_PUT_SIZE = 1024 * 1024; // 1 MB
private static final int RAW_BATCH_GET_SIZE = 16 * 1024; // 16 K
private static final int RAW_BATCH_DELETE_SIZE = 16 * 1024; // 16 K
private static final int RAW_BATCH_SCAN_SIZE = 16;
private static final int RAW_BATCH_PAIR_COUNT = 512;
public static final Histogram RAW_REQUEST_LATENCY =
Histogram.build()
.name("client_java_raw_requests_latency")
@ -98,34 +89,17 @@ public class RawKVClient implements AutoCloseable {
@Override
public void close() {}
/**
* Put a raw key-value pair to TiKV
*
* @param key raw key
* @param value raw value
*/
@Override
public void put(ByteString key, ByteString value) {
put(key, value, 0);
}
/**
* Put a raw key-value pair to TiKV
*
* @param key raw key
* @param value raw value
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
@Override
public void put(ByteString key, ByteString value, long ttl) {
put(key, value, ttl, false);
}
/**
* Put a raw key-value pair to TiKV. This API is atomic.
*
* @param key raw key
* @param value raw value
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
@Override
public void putAtomic(ByteString key, ByteString value, long ttl) {
put(key, value, ttl, true);
}
@ -166,27 +140,12 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Put a key-value pair if it does not exist. This API is atomic.
*
* @param key key
* @param value value
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
*/
@Override
public ByteString putIfAbsent(ByteString key, ByteString value) {
return putIfAbsent(key, value, 0L);
}
/**
* Put a key-value pair with TTL if it does not exist. This API is atomic.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
*/
@Override
public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put_if_absent";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -223,40 +182,22 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
*
* @param kvPairs kvPairs
*/
@Override
public void batchPut(Map<ByteString, ByteString> kvPairs) {
batchPut(kvPairs, 0);
}
/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
@Override
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, false);
}
/**
* Put a set of raw key-value pair to TiKV, this API is atomic
*
* @param kvPairs kvPairs
*/
@Override
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs) {
batchPutAtomic(kvPairs, 0);
}
/**
* Put a set of raw key-value pair to TiKV, this API is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
@Override
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, true);
}
@ -289,12 +230,7 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Get a raw key-value pair from TiKV if key exists
*
* @param key raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
@Override
public ByteString get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -332,12 +268,7 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Get a list of raw key-value pair from TiKV if key exists
*
* @param keys list of raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
@Override
public List<KvPair> batchGet(List<ByteString> keys) {
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -367,20 +298,12 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Delete a list of raw key-value pair from TiKV if key exists
*
* @param keys list of raw key
*/
@Override
public void batchDelete(List<ByteString> keys) {
batchDelete(keys, false);
}
/**
* Delete a list of raw key-value pair from TiKV if key exists, this API is atomic
*
* @param keys list of raw key
*/
@Override
public void batchDeleteAtomic(List<ByteString> keys) {
batchDelete(keys, true);
}
@ -414,13 +337,7 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Get the TTL of a raw key from TiKV if key exists
*
* @param key raw key
* @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
* exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
*/
@Override
public Long getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -457,6 +374,7 @@ public class RawKVClient implements AutoCloseable {
}
}
@Override
public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
String label = "client_raw_batch_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -507,27 +425,12 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
@Override
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
return scan(startKey, endKey, limit, false);
}
/**
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @param keyOnly whether to scan in key-only mode
* @return list of key-value pairs in range
*/
@Override
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -562,48 +465,22 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Scan raw key-value pairs from TiKV in range [startKey, )
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
@Override
public List<KvPair> scan(ByteString startKey, int limit) {
return scan(startKey, limit, false);
}
/**
* Scan raw key-value pairs from TiKV in range [startKey, )
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @param keyOnly whether to scan in key-only mode
* @return list of key-value pairs in range
*/
@Override
public List<KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
return scan(startKey, ByteString.EMPTY, limit, keyOnly);
}
/**
* Scan all raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @return list of key-value pairs in range
*/
@Override
public List<KvPair> scan(ByteString startKey, ByteString endKey) {
return scan(startKey, endKey, false);
}
/**
* Scan all raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param keyOnly whether to scan in key-only mode
* @return list of key-value pairs in range
*/
@Override
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -659,40 +536,27 @@ public class RawKVClient implements AutoCloseable {
return scan(startKey, endKey, limit, keyOnly);
}
/**
* Scan keys with prefix
*
* @param prefixKey prefix key
* @param limit limit of keys retrieved
* @param keyOnly whether to scan in keyOnly mode
* @return kvPairs with the specified prefix
*/
@Override
public List<KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly);
}
@Override
public List<KvPair> scanPrefix(ByteString prefixKey) {
return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString());
}
@Override
public List<KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly);
}
/**
* Delete a raw key-value pair from TiKV if key exists
*
* @param key raw key to be deleted
*/
@Override
public void delete(ByteString key) {
delete(key, false);
}
/**
* Delete a raw key-value pair from TiKV if key exists. This API is atomic.
*
* @param key raw key to be deleted
*/
@Override
public void deleteAtomic(ByteString key) {
delete(key, true);
}
@ -734,15 +598,7 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Delete all raw key-value pairs in range [startKey, endKey) from TiKV
*
* <p>Cautious, this API cannot be used concurrently, if multiple clients write keys into this
* range along with deleteRange API, the result will be undefined.
*
* @param startKey raw start key to be deleted
* @param endKey raw start key to be deleted
*/
@Override
public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
String label = "client_raw_delete_range";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@ -761,14 +617,7 @@ public class RawKVClient implements AutoCloseable {
}
}
/**
* Delete all raw key-value pairs with the prefix `key` from TiKV
*
* <p>Cautious, this API cannot be used concurrently, if multiple clients write keys into this
* range along with deleteRange API, the result will be undefined.
*
* @param key prefix of keys to be deleted
*/
@Override
public synchronized void deletePrefix(ByteString key) {
ByteString endKey = Key.toRawKey(key).nextPrefix().toByteString();
deleteRange(key, endKey);

View File

@ -0,0 +1,257 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.raw;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb;
public interface RawKVClientBase extends AutoCloseable {
// https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
int MAX_RAW_SCAN_LIMIT = 10240;
int MAX_RAW_BATCH_LIMIT = 1024;
int RAW_BATCH_PUT_SIZE = 1024 * 1024;
int RAW_BATCH_GET_SIZE = 16 * 1024;
int RAW_BATCH_DELETE_SIZE = 16 * 1024;
/**
* Put a raw key-value pair to TiKV
*
* @param key raw key
* @param value raw value
*/
void put(ByteString key, ByteString value);
/**
* Put a raw key-value pair to TiKV
*
* @param key raw key
* @param value raw value
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
void put(ByteString key, ByteString value, long ttl);
/**
* Put a raw key-value pair to TiKV. This API is atomic.
*
* @param key raw key
* @param value raw value
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
void putAtomic(ByteString key, ByteString value, long ttl);
/**
* Put a key-value pair if it does not exist. This API is atomic.
*
* @param key key
* @param value value
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
*/
ByteString putIfAbsent(ByteString key, ByteString value);
/**
* Put a key-value pair with TTL if it does not exist. This API is atomic.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
*/
ByteString putIfAbsent(ByteString key, ByteString value, long ttl);
/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
*
* @param kvPairs kvPairs
*/
void batchPut(Map<ByteString, ByteString> kvPairs);
/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
void batchPut(Map<ByteString, ByteString> kvPairs, long ttl);
/**
* Put a set of raw key-value pair to TiKV, this API is atomic
*
* @param kvPairs kvPairs
*/
void batchPutAtomic(Map<ByteString, ByteString> kvPairs);
/**
* Put a set of raw key-value pair to TiKV, this API is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
void batchPutAtomic(Map<ByteString, ByteString> kvPairs, long ttl);
/**
* Get a raw key-value pair from TiKV if key exists
*
* @param key raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
ByteString get(ByteString key);
/**
* Get a list of raw key-value pair from TiKV if key exists
*
* @param keys list of raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
List<Kvrpcpb.KvPair> batchGet(List<ByteString> keys);
/**
* Delete a list of raw key-value pair from TiKV if key exists
*
* @param keys list of raw key
*/
void batchDelete(List<ByteString> keys);
/**
* Delete a list of raw key-value pair from TiKV if key exists, this API is atomic
*
* @param keys list of raw key
*/
void batchDeleteAtomic(List<ByteString> keys);
/**
* Get the TTL of a raw key from TiKV if key exists
*
* @param key raw key
* @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
* exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
*/
Long getKeyTTL(ByteString key);
List<List<Kvrpcpb.KvPair>> batchScan(List<ScanOption> ranges);
/**
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit);
/**
* Scan raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @param keyOnly whether to scan in key-only mode
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly);
/**
* Scan raw key-value pairs from TiKV in range [startKey, )
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit);
/**
* Scan raw key-value pairs from TiKV in range [startKey, )
*
* @param startKey raw start key, inclusive
* @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
* @param keyOnly whether to scan in key-only mode
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit, boolean keyOnly);
/**
* Scan all raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey);
/**
* Scan all raw key-value pairs from TiKV in range [startKey, endKey)
*
* @param startKey raw start key, inclusive
* @param endKey raw end key, exclusive
* @param keyOnly whether to scan in key-only mode
* @return list of key-value pairs in range
*/
List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly);
/**
* Scan keys with prefix
*
* @param prefixKey prefix key
* @param limit limit of keys retrieved
* @param keyOnly whether to scan in keyOnly mode
* @return kvPairs with the specified prefix
*/
List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly);
List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey);
List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly);
/**
* Delete a raw key-value pair from TiKV if key exists
*
* @param key raw key to be deleted
*/
void delete(ByteString key);
/**
* Delete a raw key-value pair from TiKV if key exists. This API is atomic.
*
* @param key raw key to be deleted
*/
void deleteAtomic(ByteString key);
/**
* Delete all raw key-value pairs in range [startKey, endKey) from TiKV
*
* <p>Cautious, this API cannot be used concurrently, if multiple clients write keys into this
* range along with deleteRange API, the result will be undefined.
*
* @param startKey raw start key to be deleted
* @param endKey raw start key to be deleted
*/
void deleteRange(ByteString startKey, ByteString endKey);
/**
* Delete all raw key-value pairs with the prefix `key` from TiKV
*
* <p>Cautious, this API cannot be used concurrently, if multiple clients write keys into this
* range along with deleteRange API, the result will be undefined.
*
* @param key prefix of keys to be deleted
*/
void deletePrefix(ByteString key);
}

View File

@ -0,0 +1,280 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.raw;
import com.google.protobuf.ByteString;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.CircuitBreakerOpenException;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.service.failsafe.CircuitBreaker;
import org.tikv.service.failsafe.CircuitBreakerImpl;
public class SmartRawKVClient implements RawKVClientBase {
private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class);
private static final Histogram REQUEST_LATENCY =
Histogram.build()
.name("client_java_smart_raw_requests_latency")
.help("client smart raw request latency.")
.labelNames("type")
.register();
private static final Counter REQUEST_SUCCESS =
Counter.build()
.name("client_java_smart_raw_requests_success")
.help("client smart raw request success.")
.labelNames("type")
.register();
private static final Counter REQUEST_FAILURE =
Counter.build()
.name("client_java_smart_raw_requests_failure")
.help("client smart raw request failure.")
.labelNames("type")
.register();
private static final Counter CIRCUIT_BREAKER_OPENED =
Counter.build()
.name("client_java_smart_raw_circuit_breaker_opened")
.help("client smart raw circuit breaker opened.")
.labelNames("type")
.register();
private final RawKVClientBase client;
private final CircuitBreaker circuitBreaker;
public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) {
this.client = client;
this.circuitBreaker = new CircuitBreakerImpl(conf);
}
@Override
public void close() throws Exception {
circuitBreaker.close();
client.close();
}
@Override
public void put(ByteString key, ByteString value) {
callWithCircuitBreaker("put", () -> client.put(key, value));
}
@Override
public void put(ByteString key, ByteString value, long ttl) {
callWithCircuitBreaker("put", () -> client.put(key, value, ttl));
}
@Override
public void putAtomic(ByteString key, ByteString value, long ttl) {
callWithCircuitBreaker("putAtomic", () -> client.putAtomic(key, value, ttl));
}
@Override
public ByteString putIfAbsent(ByteString key, ByteString value) {
return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value));
}
@Override
public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value, ttl));
}
@Override
public void batchPut(Map<ByteString, ByteString> kvPairs) {
callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs));
}
@Override
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs, ttl));
}
@Override
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs) {
callWithCircuitBreaker("batchPutAtomic", () -> client.batchPutAtomic(kvPairs));
}
@Override
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs, long ttl) {
callWithCircuitBreaker("batchPutAtomic", () -> client.batchPutAtomic(kvPairs, ttl));
}
@Override
public ByteString get(ByteString key) {
return callWithCircuitBreaker("get", () -> client.get(key));
}
@Override
public List<Kvrpcpb.KvPair> batchGet(List<ByteString> keys) {
return callWithCircuitBreaker("batchGet", () -> client.batchGet(keys));
}
@Override
public void batchDelete(List<ByteString> keys) {
callWithCircuitBreaker("batchDelete", () -> client.batchDelete(keys));
}
@Override
public void batchDeleteAtomic(List<ByteString> keys) {
callWithCircuitBreaker("batchDeleteAtomic", () -> client.batchDeleteAtomic(keys));
}
@Override
public Long getKeyTTL(ByteString key) {
return callWithCircuitBreaker("getKeyTTL", () -> client.getKeyTTL(key));
}
@Override
public List<List<Kvrpcpb.KvPair>> batchScan(List<ScanOption> ranges) {
return callWithCircuitBreaker("batchScan", () -> client.batchScan(ranges));
}
@Override
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit));
}
@Override
public List<Kvrpcpb.KvPair> scan(
ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit, keyOnly));
}
@Override
public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit));
}
@Override
public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit, keyOnly));
}
@Override
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey) {
return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey));
}
@Override
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, keyOnly));
}
@Override
public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, limit, keyOnly));
}
@Override
public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey) {
return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey));
}
@Override
public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, keyOnly));
}
@Override
public void delete(ByteString key) {
callWithCircuitBreaker("delete", () -> client.delete(key));
}
@Override
public void deleteAtomic(ByteString key) {
callWithCircuitBreaker("deleteAtomic", () -> client.deleteAtomic(key));
}
@Override
public void deleteRange(ByteString startKey, ByteString endKey) {
callWithCircuitBreaker("deleteRange", () -> client.deleteRange(startKey, endKey));
}
@Override
public void deletePrefix(ByteString key) {
callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key));
}
<T> T callWithCircuitBreaker(String funcName, Function1<T> func) {
Histogram.Timer requestTimer = REQUEST_LATENCY.labels(funcName).startTimer();
try {
T result = callWithCircuitBreaker0(funcName, func);
REQUEST_SUCCESS.labels(funcName).inc();
return result;
} catch (Exception e) {
REQUEST_FAILURE.labels(funcName).inc();
throw e;
} finally {
requestTimer.observeDuration();
}
}
private <T> T callWithCircuitBreaker0(String funcName, Function1<T> func) {
if (circuitBreaker.allowRequest()) {
try {
T result = func.apply();
circuitBreaker.getMetrics().recordSuccess();
return result;
} catch (Exception e) {
circuitBreaker.getMetrics().recordFailure();
throw e;
}
} else if (circuitBreaker.attemptExecution()) {
logger.debug("attemptExecution");
try {
T result = func.apply();
circuitBreaker.getMetrics().recordSuccess();
circuitBreaker.recordAttemptSuccess();
logger.debug("markSuccess");
return result;
} catch (Exception e) {
circuitBreaker.getMetrics().recordFailure();
circuitBreaker.recordAttemptFailure();
logger.debug("markNonSuccess");
throw e;
}
} else {
logger.debug("Circuit Breaker Opened");
CIRCUIT_BREAKER_OPENED.labels(funcName).inc();
throw new CircuitBreakerOpenException();
}
}
private void callWithCircuitBreaker(String funcName, Function0 func) {
callWithCircuitBreaker(
funcName,
(Function1<Void>)
() -> {
func.apply();
return null;
});
}
public interface Function1<T> {
T apply();
}
public interface Function0 {
void apply();
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
import java.io.Closeable;
public interface CircuitBreaker extends Closeable {
enum Status {
CLOSED(0),
HALF_OPEN(1),
OPEN(2);
private final int value;
private Status(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
/**
* Every requests asks this if it is allowed to proceed or not. It is idempotent and does not
* modify any internal state.
*
* @return boolean whether a request should be permitted
*/
boolean allowRequest();
/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may
* modify internal state.
*/
boolean attemptExecution();
/** Invoked on successful executions as part of feedback mechanism when in a half-open state. */
void recordAttemptSuccess();
/** Invoked on unsuccessful executions as part of feedback mechanism when in a half-open state. */
void recordAttemptFailure();
/** Get the Circuit Breaker Metrics Object. */
CircuitBreakerMetrics getMetrics();
}

View File

@ -0,0 +1,202 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
import io.prometheus.client.Counter;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
public class CircuitBreakerImpl implements CircuitBreaker {
private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class);
private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER =
Counter.build()
.name("client_java_circuit_breaker_attempt_counter")
.help("client circuit breaker attempt counter.")
.labelNames("type")
.register();
private final boolean enable;
private final int windowInSeconds;
private final int errorThresholdPercentage;
private final int requestVolumeThreshold;
private final int sleepWindowInSeconds;
private final int attemptRequestCount;
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Status> status = new AtomicReference<>(Status.CLOSED);
private final AtomicLong attemptCount = new AtomicLong(0);
private final AtomicLong attemptSuccessCount = new AtomicLong(0);
private final CircuitBreakerMetrics metrics;
public CircuitBreakerImpl(TiConfiguration conf) {
this(
conf.isCircuitBreakEnable(),
conf.getCircuitBreakAvailabilityWindowInSeconds(),
conf.getCircuitBreakAvailabilityErrorThresholdPercentage(),
conf.getCircuitBreakAvailabilityRequestVolumnThreshold(),
conf.getCircuitBreakSleepWindowInSeconds(),
conf.getCircuitBreakAttemptRequestCount());
}
public CircuitBreakerImpl(
boolean enable,
int windowInSeconds,
int errorThresholdPercentage,
int requestVolumeThreshold,
int sleepWindowInSeconds,
int attemptRequestCount) {
this.enable = enable;
this.windowInSeconds = windowInSeconds;
this.errorThresholdPercentage = errorThresholdPercentage;
this.requestVolumeThreshold = requestVolumeThreshold;
this.sleepWindowInSeconds = sleepWindowInSeconds;
this.attemptRequestCount = attemptRequestCount;
this.metrics =
enable ? new CircuitBreakerMetricsImpl(windowInSeconds) : new NoOpCircuitBreakerMetrics();
this.metrics.addListener(getMetricsListener());
}
private MetricsListener getMetricsListener() {
return hc -> {
logger.debug("onNext " + hc.toString());
// check if we are past the requestVolumeThreshold
if (hc.getTotalRequests() < requestVolumeThreshold) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for some successful command executions
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < errorThresholdPercentage) {
// we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for some successful command executions
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
close2Open();
}
}
};
}
@Override
public CircuitBreakerMetrics getMetrics() {
return metrics;
}
@Override
public boolean allowRequest() {
if (!enable) {
return true;
}
return !isOpen();
}
boolean isOpen() {
return circuitOpened.get() >= 0;
}
Status getStatus() {
return status.get();
}
@Override
public void recordAttemptSuccess() {
CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success").inc();
if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) {
halfOpen2Close();
}
}
@Override
public void recordAttemptFailure() {
CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure").inc();
halfOpen2Open();
}
@Override
public boolean attemptExecution() {
if (allowRequest()) {
return true;
} else {
if (isAfterSleepWindow()) {
// only the `attemptRequestCount` requests after sleep window should execute
// if all the executing commands succeed, the status will transition to CLOSED
// if some of the executing commands fail, the status will transition to OPEN
open2HalfOpen();
return attemptCount.incrementAndGet() <= attemptRequestCount;
} else {
return false;
}
}
}
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = (long) sleepWindowInSeconds * 1000;
return currentTime >= circuitOpenTime + sleepWindowTime;
}
private void close2Open() {
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
// This thread wins the race to open the circuit
// it sets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
logger.info("CLOSED => OPEN");
}
}
private void halfOpen2Close() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
// This thread wins the race to close the circuit
circuitOpened.set(-1L);
logger.info("HALF_OPEN => CLOSED");
}
}
private void open2HalfOpen() {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
// This thread wins the race to half close the circuit
// it resets the attempt count
attemptCount.set(0);
attemptSuccessCount.set(0);
logger.info("OPEN => HALF_OPEN");
}
}
private void halfOpen2Open() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
// This thread wins the race to re-open the circuit
// it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
logger.info("HALF_OPEN => OPEN");
}
}
@Override
public void close() throws IOException {
metrics.close();
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
import java.io.Closeable;
public interface CircuitBreakerMetrics extends Closeable {
/** Record a successful call. */
void recordSuccess();
/** Record a failure call. */
void recordFailure();
/** Add metrics listener. */
void addListener(MetricsListener metricsListener);
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics {
private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class);
private final int windowInMS;
private final List<MetricsListener> listeners;
private final AtomicReference<SingleWindowMetrics> currentMetrics;
private final ScheduledExecutorService scheduler;
private static final int SCHEDULER_INITIAL_DELAY = 1000;
private static final int SCHEDULER_PERIOD = 1000;
public CircuitBreakerMetricsImpl(int windowInSeconds) {
this.windowInMS = windowInSeconds * 1000;
this.listeners = new ArrayList<>();
this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics());
scheduler =
new ScheduledThreadPoolExecutor(
1,
new BasicThreadFactory.Builder()
.namingPattern("circuit-breaker-metrics-%d")
.daemon(true)
.build());
scheduler.scheduleAtFixedRate(
this::onReachCircuitWindow,
SCHEDULER_INITIAL_DELAY,
SCHEDULER_PERIOD,
TimeUnit.MILLISECONDS);
}
@Override
public void recordSuccess() {
currentMetrics.get().recordSuccess();
}
@Override
public void recordFailure() {
currentMetrics.get().recordFailure();
}
private void onReachCircuitWindow() {
SingleWindowMetrics singleWindowMetrics = currentMetrics.get();
if (System.currentTimeMillis() < singleWindowMetrics.getStartMS() + windowInMS) {
return;
}
if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) {
return;
}
logger.debug("window timeout, reset SingleWindowMetrics");
HealthCounts healthCounts = singleWindowMetrics.getHealthCounts();
for (MetricsListener metricsListener : listeners) {
metricsListener.onNext(healthCounts);
}
}
@Override
public void addListener(MetricsListener metricsListener) {
listeners.add(metricsListener);
}
@Override
public void close() throws IOException {
scheduler.shutdown();
}
/** Instead of using SingleWindowMetrics, it is better to use RollingWindowMetrics. */
static class SingleWindowMetrics {
private final long startMS = System.currentTimeMillis();
private final AtomicLong totalCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public void recordSuccess() {
totalCount.incrementAndGet();
}
public void recordFailure() {
totalCount.incrementAndGet();
errorCount.incrementAndGet();
}
public HealthCounts getHealthCounts() {
return new HealthCounts(totalCount.get(), errorCount.get());
}
public long getStartMS() {
return startMS;
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
public class HealthCounts {
private final long totalCount;
private final long errorCount;
private final int errorPercentage;
HealthCounts(long total, long error) {
this.totalCount = total;
this.errorCount = error;
if (totalCount > 0) {
this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
} else {
this.errorPercentage = 0;
}
}
public long getTotalRequests() {
return totalCount;
}
public long getErrorCount() {
return errorCount;
}
public int getErrorPercentage() {
return errorPercentage;
}
@Override
public String toString() {
return "HealthCounts{"
+ "totalCount="
+ totalCount
+ ", errorCount="
+ errorCount
+ ", errorPercentage="
+ errorPercentage
+ '}';
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
public interface MetricsListener {
void onNext(HealthCounts healthCounts);
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.service.failsafe;
import java.io.IOException;
public class NoOpCircuitBreakerMetrics implements CircuitBreakerMetrics {
@Override
public void recordSuccess() {
// do nothing
}
@Override
public void recordFailure() {
// do nothing
}
@Override
public void addListener(MetricsListener metricsListener) {
// do nothing
}
@Override
public void close() throws IOException {
// do nothing
}
}

View File

@ -0,0 +1,85 @@
package org.tikv.raw;
import static org.junit.Assert.assertTrue;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.CircuitBreakerOpenException;
public class SmartRawKVClientTest {
private boolean enable = true;
private int windowInSeconds = 2;
private int errorThresholdPercentage = 100;
private int requestVolumeThreshold = 10;
private int sleepWindowInSeconds = 1;
private int attemptRequestCount = 10;
private int sleepDelta = 100;
private TiSession session;
private SmartRawKVClient client;
@Before
public void setup() {
TiConfiguration conf = TiConfiguration.createRawDefault();
conf.setCircuitBreakEnable(enable);
conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds);
conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage);
conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold);
conf.setCircuitBreakSleepWindowInSeconds(sleepWindowInSeconds);
conf.setCircuitBreakAttemptRequestCount(attemptRequestCount);
session = TiSession.create(conf);
client = session.createSmartRawClient();
}
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}
@Test
public void testCircuitBreaker() throws InterruptedException {
// CLOSED => OPEN
{
for (int i = 1; i <= requestVolumeThreshold; i++) {
error();
}
Thread.sleep(windowInSeconds * 1000 + sleepDelta);
Exception error = null;
try {
client.get(ByteString.copyFromUtf8("key"));
assertTrue(false);
} catch (Exception e) {
error = e;
}
assertTrue(error instanceof CircuitBreakerOpenException);
}
// OPEN => CLOSED
{
Thread.sleep(sleepWindowInSeconds * 1000);
for (int i = 1; i <= attemptRequestCount; i++) {
success();
}
client.get(ByteString.copyFromUtf8("key"));
}
}
private void success() {
client.get(ByteString.copyFromUtf8("key"));
}
private void error() {
try {
client.callWithCircuitBreaker("error", () -> 1 / 0);
} catch (Exception ignored) {
}
}
}

View File

@ -0,0 +1,69 @@
package org.tikv.service.failsafe;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
public class CircuitBreakerMetricsTest {
private static final int TEST_COUNT = 10;
private static final int WINDOW_IN_SECONDS = 1;
private static final int SLEEP_DELTA = 100;
@Test
public void testAllSuccess() throws InterruptedException, IOException {
CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS);
AtomicReference<HealthCounts> healthCounts = new AtomicReference<>();
MetricsListener metricsListener = healthCounts::set;
metrics.addListener(metricsListener);
for (int i = 1; i <= TEST_COUNT; i++) {
metrics.recordSuccess();
}
Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA);
assertNotNull(healthCounts.get());
assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT);
assertEquals(healthCounts.get().getErrorPercentage(), 0);
metrics.close();
}
@Test
public void testAllFailure() throws InterruptedException, IOException {
CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS);
AtomicReference<HealthCounts> healthCounts = new AtomicReference<>();
MetricsListener metricsListener = healthCounts::set;
metrics.addListener(metricsListener);
for (int i = 1; i <= TEST_COUNT; i++) {
metrics.recordFailure();
}
Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA);
assertNotNull(healthCounts.get());
assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT);
assertEquals(healthCounts.get().getErrorPercentage(), 100);
metrics.close();
}
@Test
public void testHalfFailure() throws InterruptedException, IOException {
CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS);
AtomicReference<HealthCounts> healthCounts = new AtomicReference<>();
MetricsListener metricsListener = healthCounts::set;
metrics.addListener(metricsListener);
for (int i = 1; i <= TEST_COUNT; i++) {
metrics.recordFailure();
metrics.recordSuccess();
}
Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA);
assertNotNull(healthCounts.get());
assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT * 2);
assertEquals(healthCounts.get().getErrorPercentage(), 50);
metrics.close();
}
}

View File

@ -0,0 +1,69 @@
package org.tikv.service.failsafe;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
public class CircuitBreakerTest {
@Test
public void testCircuitBreaker() throws InterruptedException {
boolean enable = true;
int windowInSeconds = 2;
int errorThresholdPercentage = 100;
int requestVolumeThreshold = 10;
int sleepWindowInSeconds = 1;
int attemptRequestCount = 10;
int sleepDelta = 100;
CircuitBreakerImpl circuitBreaker =
new CircuitBreakerImpl(
enable,
windowInSeconds,
errorThresholdPercentage,
requestVolumeThreshold,
sleepWindowInSeconds,
attemptRequestCount);
CircuitBreakerMetrics metrics = circuitBreaker.getMetrics();
// initial state: CLOSE
assertTrue(!circuitBreaker.isOpen());
assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED);
// CLOSE => OPEN
for (int i = 1; i <= requestVolumeThreshold; i++) {
metrics.recordFailure();
}
Thread.sleep(windowInSeconds * 1000 + sleepDelta);
assertTrue(circuitBreaker.isOpen());
assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN);
// OPEN => HALF_OPEN
Thread.sleep(sleepWindowInSeconds * 1000);
assertTrue(circuitBreaker.attemptExecution());
assertTrue(circuitBreaker.isOpen());
assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN);
// HALF_OPEN => OPEN
circuitBreaker.recordAttemptFailure();
assertTrue(circuitBreaker.isOpen());
assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN);
// OPEN => HALF_OPEN
Thread.sleep(sleepWindowInSeconds * 1000 + sleepDelta);
assertTrue(circuitBreaker.attemptExecution());
circuitBreaker.recordAttemptSuccess();
assertTrue(circuitBreaker.isOpen());
assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN);
// HALF_OPEN => CLOSED
for (int i = 1; i < attemptRequestCount; i++) {
assertTrue(circuitBreaker.attemptExecution());
circuitBreaker.recordAttemptSuccess();
}
assertTrue(!circuitBreaker.isOpen());
assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED);
}
}