From 450965a515dac2c0bc275ebf783055cf3f137d42 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 10 Dec 2021 02:26:12 +0800 Subject: [PATCH] cherry pick #358 to release-3.1 (#373) Signed-off-by: ti-srebot Signed-off-by: marsishandsome Co-authored-by: Liangliang Gu --- .../java/org/tikv/common/ConfigUtils.java | 20 ++ .../java/org/tikv/common/TiConfiguration.java | 77 +++++ src/main/java/org/tikv/common/TiSession.java | 6 + .../CircuitBreakerOpenException.java | 22 ++ src/main/java/org/tikv/raw/RawKVClient.java | 209 ++----------- .../java/org/tikv/raw/RawKVClientBase.java | 257 ++++++++++++++++ .../java/org/tikv/raw/SmartRawKVClient.java | 280 ++++++++++++++++++ .../tikv/service/failsafe/CircuitBreaker.java | 60 ++++ .../service/failsafe/CircuitBreakerImpl.java | 202 +++++++++++++ .../failsafe/CircuitBreakerMetrics.java | 29 ++ .../failsafe/CircuitBreakerMetricsImpl.java | 120 ++++++++ .../tikv/service/failsafe/HealthCounts.java | 56 ++++ .../service/failsafe/MetricsListener.java | 20 ++ .../failsafe/NoOpCircuitBreakerMetrics.java | 40 +++ .../org/tikv/raw/SmartRawKVClientTest.java | 85 ++++++ .../failsafe/CircuitBreakerMetricsTest.java | 69 +++++ .../service/failsafe/CircuitBreakerTest.java | 69 +++++ 17 files changed, 1441 insertions(+), 180 deletions(-) create mode 100644 src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java create mode 100644 src/main/java/org/tikv/raw/RawKVClientBase.java create mode 100644 src/main/java/org/tikv/raw/SmartRawKVClient.java create mode 100644 src/main/java/org/tikv/service/failsafe/CircuitBreaker.java create mode 100644 src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java create mode 100644 src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java create mode 100644 src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java create mode 100644 src/main/java/org/tikv/service/failsafe/HealthCounts.java create mode 100644 src/main/java/org/tikv/service/failsafe/MetricsListener.java create mode 100644 src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java create mode 100644 src/test/java/org/tikv/raw/SmartRawKVClientTest.java create mode 100644 src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java create mode 100644 src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index f400242ef4..72e5d756d5 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -75,6 +75,19 @@ public class ConfigUtils { "tikv.rawkv.batch_write_slowlog_in_ms"; public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms"; + public static final String TiKV_CIRCUIT_BREAK_ENABLE = "tikv.circuit_break.enable"; + public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = + "tikv.circuit_break.trigger.availability.window_in_seconds"; + public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = + "tikv.circuit_break.trigger.availability.error_threshold_percentage"; + public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD = + "tikv.circuit_break.trigger.availability.request_volumn_threshold"; + public static final String TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = + "tikv.circuit_break.trigger.sleep_window_in_seconds"; + public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = + "tikv.circuit_break.trigger.attempt_request_count"; + + public static final String TIFLASH_ENABLE = "tiflash.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_FORWARD_TIMEOUT = "300ms"; @@ -132,4 +145,11 @@ public class ConfigUtils { public static final String LEADER = "LEADER"; public static final String FOLLOWER = "FOLLOWER"; public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER"; + + public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false; + public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60; + public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100; + public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10; + public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20; + public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10; } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 54e9622401..760266d750 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -120,6 +120,20 @@ public class TiConfiguration implements Serializable { setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS); setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); + setIfMissing(TiKV_CIRCUIT_BREAK_ENABLE, DEF_TiKV_CIRCUIT_BREAK_ENABLE); + setIfMissing( + TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS, + DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS); + setIfMissing( + TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE, + DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE); + setIfMissing( + TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD, + DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD); + setIfMissing( + TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); + setIfMissing( + TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); } public static void listAll() { @@ -328,6 +342,16 @@ public class TiConfiguration implements Serializable { getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS); private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); + private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE); + private int circuitBreakAvailabilityWindowInSeconds = + getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS); + private int circuitBreakAvailabilityErrorThresholdPercentage = + getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE); + private int circuitBreakAvailabilityRequestVolumnThreshold = + getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD); + private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); + private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + public enum KVMode { TXN, RAW @@ -729,4 +753,57 @@ public class TiConfiguration implements Serializable { public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) { this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS; } + + public boolean isCircuitBreakEnable() { + return circuitBreakEnable; + } + + public void setCircuitBreakEnable(boolean circuitBreakEnable) { + this.circuitBreakEnable = circuitBreakEnable; + } + + public int getCircuitBreakAvailabilityWindowInSeconds() { + return circuitBreakAvailabilityWindowInSeconds; + } + + public void setCircuitBreakAvailabilityWindowInSeconds( + int circuitBreakAvailabilityWindowInSeconds) { + this.circuitBreakAvailabilityWindowInSeconds = circuitBreakAvailabilityWindowInSeconds; + } + + public int getCircuitBreakAvailabilityErrorThresholdPercentage() { + return circuitBreakAvailabilityErrorThresholdPercentage; + } + + public void setCircuitBreakAvailabilityErrorThresholdPercentage( + int circuitBreakAvailabilityErrorThresholdPercentage) { + this.circuitBreakAvailabilityErrorThresholdPercentage = + circuitBreakAvailabilityErrorThresholdPercentage; + } + + public int getCircuitBreakAvailabilityRequestVolumnThreshold() { + return circuitBreakAvailabilityRequestVolumnThreshold; + } + + public void setCircuitBreakAvailabilityRequestVolumnThreshold( + int circuitBreakAvailabilityRequestVolumnThreshold) { + this.circuitBreakAvailabilityRequestVolumnThreshold = + circuitBreakAvailabilityRequestVolumnThreshold; + } + + public int getCircuitBreakSleepWindowInSeconds() { + return circuitBreakSleepWindowInSeconds; + } + + public void setCircuitBreakSleepWindowInSeconds(int circuitBreakSleepWindowInSeconds) { + this.circuitBreakSleepWindowInSeconds = circuitBreakSleepWindowInSeconds; + } + + public int getCircuitBreakAttemptRequestCount() { + return circuitBreakAttemptRequestCount; + } + + public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) { + this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index d88b7e85ef..df17109c3d 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -41,6 +41,7 @@ import org.tikv.common.region.TiStore; import org.tikv.common.util.*; import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; +import org.tikv.raw.SmartRawKVClient; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -112,6 +113,11 @@ public class TiSession implements AutoCloseable { return new RawKVClient(this, builder); } + public SmartRawKVClient createSmartRawClient() { + RawKVClient rawKVClient = createRawClient(); + return new SmartRawKVClient(rawKVClient, getConf()); + } + public KVClient createKVClient() { checkIsClosed(); diff --git a/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java b/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java new file mode 100644 index 0000000000..0898257564 --- /dev/null +++ b/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.exception; + +public class CircuitBreakerOpenException extends RuntimeException { + public CircuitBreakerOpenException() { + super("Circuit Breaker Opened"); + } +} diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 76ef92581a..155a0010f5 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -40,7 +40,7 @@ import org.tikv.common.region.TiRegion; import org.tikv.common.util.*; import org.tikv.kvproto.Kvrpcpb.KvPair; -public class RawKVClient implements AutoCloseable { +public class RawKVClient implements RawKVClientBase { private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final ExecutorService batchGetThreadPool; @@ -50,15 +50,6 @@ public class RawKVClient implements AutoCloseable { private final ExecutorService deleteRangeThreadPool; private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class); - // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go - private static final int MAX_RAW_SCAN_LIMIT = 10240; - private static final int MAX_RAW_BATCH_LIMIT = 1024; - private static final int RAW_BATCH_PUT_SIZE = 1024 * 1024; // 1 MB - private static final int RAW_BATCH_GET_SIZE = 16 * 1024; // 16 K - private static final int RAW_BATCH_DELETE_SIZE = 16 * 1024; // 16 K - private static final int RAW_BATCH_SCAN_SIZE = 16; - private static final int RAW_BATCH_PAIR_COUNT = 512; - public static final Histogram RAW_REQUEST_LATENCY = Histogram.build() .name("client_java_raw_requests_latency") @@ -98,34 +89,17 @@ public class RawKVClient implements AutoCloseable { @Override public void close() {} - /** - * Put a raw key-value pair to TiKV - * - * @param key raw key - * @param value raw value - */ + @Override public void put(ByteString key, ByteString value) { put(key, value, 0); } - /** - * Put a raw key-value pair to TiKV - * - * @param key raw key - * @param value raw value - * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated - */ + @Override public void put(ByteString key, ByteString value, long ttl) { put(key, value, ttl, false); } - /** - * Put a raw key-value pair to TiKV. This API is atomic. - * - * @param key raw key - * @param value raw value - * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated - */ + @Override public void putAtomic(ByteString key, ByteString value, long ttl) { put(key, value, ttl, true); } @@ -166,27 +140,12 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Put a key-value pair if it does not exist. This API is atomic. - * - * @param key key - * @param value value - * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns - * the previous key if the value already exists, and does not write to TiKV. - */ + @Override public ByteString putIfAbsent(ByteString key, ByteString value) { return putIfAbsent(key, value, 0L); } - /** - * Put a key-value pair with TTL if it does not exist. This API is atomic. - * - * @param key key - * @param value value - * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. - * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns - * the previous key if the value already exists, and does not write to TiKV. - */ + @Override public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -223,40 +182,22 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic. - * - * @param kvPairs kvPairs - */ + @Override public void batchPut(Map kvPairs) { batchPut(kvPairs, 0); } - /** - * Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic. - * - * @param kvPairs kvPairs - * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated - */ + @Override public void batchPut(Map kvPairs, long ttl) { batchPut(kvPairs, ttl, false); } - /** - * Put a set of raw key-value pair to TiKV, this API is atomic - * - * @param kvPairs kvPairs - */ + @Override public void batchPutAtomic(Map kvPairs) { batchPutAtomic(kvPairs, 0); } - /** - * Put a set of raw key-value pair to TiKV, this API is atomic. - * - * @param kvPairs kvPairs - * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated - */ + @Override public void batchPutAtomic(Map kvPairs, long ttl) { batchPut(kvPairs, ttl, true); } @@ -289,12 +230,7 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Get a raw key-value pair from TiKV if key exists - * - * @param key raw key - * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist - */ + @Override public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -332,12 +268,7 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Get a list of raw key-value pair from TiKV if key exists - * - * @param keys list of raw key - * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist - */ + @Override public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -367,20 +298,12 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Delete a list of raw key-value pair from TiKV if key exists - * - * @param keys list of raw key - */ + @Override public void batchDelete(List keys) { batchDelete(keys, false); } - /** - * Delete a list of raw key-value pair from TiKV if key exists, this API is atomic - * - * @param keys list of raw key - */ + @Override public void batchDeleteAtomic(List keys) { batchDelete(keys, true); } @@ -414,13 +337,7 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Get the TTL of a raw key from TiKV if key exists - * - * @param key raw key - * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key - * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist - */ + @Override public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -457,6 +374,7 @@ public class RawKVClient implements AutoCloseable { } } + @Override public List> batchScan(List ranges) { String label = "client_raw_batch_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -507,27 +425,12 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Scan raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey, int limit) { return scan(startKey, endKey, limit, false); } - /** - * Scan raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @param keyOnly whether to scan in key-only mode - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -562,48 +465,22 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Scan raw key-value pairs from TiKV in range [startKey, ♾) - * - * @param startKey raw start key, inclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, int limit) { return scan(startKey, limit, false); } - /** - * Scan raw key-value pairs from TiKV in range [startKey, ♾) - * - * @param startKey raw start key, inclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @param keyOnly whether to scan in key-only mode - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, int limit, boolean keyOnly) { return scan(startKey, ByteString.EMPTY, limit, keyOnly); } - /** - * Scan all raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey) { return scan(startKey, endKey, false); } - /** - * Scan all raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @param keyOnly whether to scan in key-only mode - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -659,40 +536,27 @@ public class RawKVClient implements AutoCloseable { return scan(startKey, endKey, limit, keyOnly); } - /** - * Scan keys with prefix - * - * @param prefixKey prefix key - * @param limit limit of keys retrieved - * @param keyOnly whether to scan in keyOnly mode - * @return kvPairs with the specified prefix - */ + @Override public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) { return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly); } + @Override public List scanPrefix(ByteString prefixKey) { return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString()); } + @Override public List scanPrefix(ByteString prefixKey, boolean keyOnly) { return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly); } - /** - * Delete a raw key-value pair from TiKV if key exists - * - * @param key raw key to be deleted - */ + @Override public void delete(ByteString key) { delete(key, false); } - /** - * Delete a raw key-value pair from TiKV if key exists. This API is atomic. - * - * @param key raw key to be deleted - */ + @Override public void deleteAtomic(ByteString key) { delete(key, true); } @@ -734,15 +598,7 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Delete all raw key-value pairs in range [startKey, endKey) from TiKV - * - *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this - * range along with deleteRange API, the result will be undefined. - * - * @param startKey raw start key to be deleted - * @param endKey raw start key to be deleted - */ + @Override public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -761,14 +617,7 @@ public class RawKVClient implements AutoCloseable { } } - /** - * Delete all raw key-value pairs with the prefix `key` from TiKV - * - *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this - * range along with deleteRange API, the result will be undefined. - * - * @param key prefix of keys to be deleted - */ + @Override public synchronized void deletePrefix(ByteString key) { ByteString endKey = Key.toRawKey(key).nextPrefix().toByteString(); deleteRange(key, endKey); diff --git a/src/main/java/org/tikv/raw/RawKVClientBase.java b/src/main/java/org/tikv/raw/RawKVClientBase.java new file mode 100644 index 0000000000..96b3d98e84 --- /dev/null +++ b/src/main/java/org/tikv/raw/RawKVClientBase.java @@ -0,0 +1,257 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.raw; + +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import org.tikv.common.util.ScanOption; +import org.tikv.kvproto.Kvrpcpb; + +public interface RawKVClientBase extends AutoCloseable { + // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go + int MAX_RAW_SCAN_LIMIT = 10240; + int MAX_RAW_BATCH_LIMIT = 1024; + int RAW_BATCH_PUT_SIZE = 1024 * 1024; + int RAW_BATCH_GET_SIZE = 16 * 1024; + int RAW_BATCH_DELETE_SIZE = 16 * 1024; + + /** + * Put a raw key-value pair to TiKV + * + * @param key raw key + * @param value raw value + */ + void put(ByteString key, ByteString value); + + /** + * Put a raw key-value pair to TiKV + * + * @param key raw key + * @param value raw value + * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated + */ + void put(ByteString key, ByteString value, long ttl); + + /** + * Put a raw key-value pair to TiKV. This API is atomic. + * + * @param key raw key + * @param value raw value + * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated + */ + void putAtomic(ByteString key, ByteString value, long ttl); + + /** + * Put a key-value pair if it does not exist. This API is atomic. + * + * @param key key + * @param value value + * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns + * the previous key if the value already exists, and does not write to TiKV. + */ + ByteString putIfAbsent(ByteString key, ByteString value); + + /** + * Put a key-value pair with TTL if it does not exist. This API is atomic. + * + * @param key key + * @param value value + * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. + * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns + * the previous key if the value already exists, and does not write to TiKV. + */ + ByteString putIfAbsent(ByteString key, ByteString value, long ttl); + + /** + * Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic. + * + * @param kvPairs kvPairs + */ + void batchPut(Map kvPairs); + + /** + * Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic. + * + * @param kvPairs kvPairs + * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated + */ + void batchPut(Map kvPairs, long ttl); + + /** + * Put a set of raw key-value pair to TiKV, this API is atomic + * + * @param kvPairs kvPairs + */ + void batchPutAtomic(Map kvPairs); + + /** + * Put a set of raw key-value pair to TiKV, this API is atomic. + * + * @param kvPairs kvPairs + * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated + */ + void batchPutAtomic(Map kvPairs, long ttl); + + /** + * Get a raw key-value pair from TiKV if key exists + * + * @param key raw key + * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist + */ + ByteString get(ByteString key); + + /** + * Get a list of raw key-value pair from TiKV if key exists + * + * @param keys list of raw key + * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist + */ + List batchGet(List keys); + + /** + * Delete a list of raw key-value pair from TiKV if key exists + * + * @param keys list of raw key + */ + void batchDelete(List keys); + + /** + * Delete a list of raw key-value pair from TiKV if key exists, this API is atomic + * + * @param keys list of raw key + */ + void batchDeleteAtomic(List keys); + + /** + * Get the TTL of a raw key from TiKV if key exists + * + * @param key raw key + * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key + * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist + */ + Long getKeyTTL(ByteString key); + + List> batchScan(List ranges); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey, int limit); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @param keyOnly whether to scan in key-only mode + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, ♾) + * + * @param startKey raw start key, inclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, int limit); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, ♾) + * + * @param startKey raw start key, inclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @param keyOnly whether to scan in key-only mode + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, int limit, boolean keyOnly); + + /** + * Scan all raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey); + + /** + * Scan all raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @param keyOnly whether to scan in key-only mode + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey, boolean keyOnly); + + /** + * Scan keys with prefix + * + * @param prefixKey prefix key + * @param limit limit of keys retrieved + * @param keyOnly whether to scan in keyOnly mode + * @return kvPairs with the specified prefix + */ + List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly); + + List scanPrefix(ByteString prefixKey); + + List scanPrefix(ByteString prefixKey, boolean keyOnly); + + /** + * Delete a raw key-value pair from TiKV if key exists + * + * @param key raw key to be deleted + */ + void delete(ByteString key); + + /** + * Delete a raw key-value pair from TiKV if key exists. This API is atomic. + * + * @param key raw key to be deleted + */ + void deleteAtomic(ByteString key); + + /** + * Delete all raw key-value pairs in range [startKey, endKey) from TiKV + * + *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this + * range along with deleteRange API, the result will be undefined. + * + * @param startKey raw start key to be deleted + * @param endKey raw start key to be deleted + */ + void deleteRange(ByteString startKey, ByteString endKey); + + /** + * Delete all raw key-value pairs with the prefix `key` from TiKV + * + *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this + * range along with deleteRange API, the result will be undefined. + * + * @param key prefix of keys to be deleted + */ + void deletePrefix(ByteString key); +} diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java new file mode 100644 index 0000000000..a9c4f4aa4c --- /dev/null +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -0,0 +1,280 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.raw; + +import com.google.protobuf.ByteString; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.exception.CircuitBreakerOpenException; +import org.tikv.common.util.ScanOption; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.service.failsafe.CircuitBreaker; +import org.tikv.service.failsafe.CircuitBreakerImpl; + +public class SmartRawKVClient implements RawKVClientBase { + private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); + + private static final Histogram REQUEST_LATENCY = + Histogram.build() + .name("client_java_smart_raw_requests_latency") + .help("client smart raw request latency.") + .labelNames("type") + .register(); + + private static final Counter REQUEST_SUCCESS = + Counter.build() + .name("client_java_smart_raw_requests_success") + .help("client smart raw request success.") + .labelNames("type") + .register(); + + private static final Counter REQUEST_FAILURE = + Counter.build() + .name("client_java_smart_raw_requests_failure") + .help("client smart raw request failure.") + .labelNames("type") + .register(); + + private static final Counter CIRCUIT_BREAKER_OPENED = + Counter.build() + .name("client_java_smart_raw_circuit_breaker_opened") + .help("client smart raw circuit breaker opened.") + .labelNames("type") + .register(); + + private final RawKVClientBase client; + private final CircuitBreaker circuitBreaker; + + public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { + this.client = client; + this.circuitBreaker = new CircuitBreakerImpl(conf); + } + + @Override + public void close() throws Exception { + circuitBreaker.close(); + client.close(); + } + + @Override + public void put(ByteString key, ByteString value) { + callWithCircuitBreaker("put", () -> client.put(key, value)); + } + + @Override + public void put(ByteString key, ByteString value, long ttl) { + callWithCircuitBreaker("put", () -> client.put(key, value, ttl)); + } + + @Override + public void putAtomic(ByteString key, ByteString value, long ttl) { + callWithCircuitBreaker("putAtomic", () -> client.putAtomic(key, value, ttl)); + } + + @Override + public ByteString putIfAbsent(ByteString key, ByteString value) { + return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value)); + } + + @Override + public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { + return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value, ttl)); + } + + @Override + public void batchPut(Map kvPairs) { + callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs)); + } + + @Override + public void batchPut(Map kvPairs, long ttl) { + callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs, ttl)); + } + + @Override + public void batchPutAtomic(Map kvPairs) { + callWithCircuitBreaker("batchPutAtomic", () -> client.batchPutAtomic(kvPairs)); + } + + @Override + public void batchPutAtomic(Map kvPairs, long ttl) { + callWithCircuitBreaker("batchPutAtomic", () -> client.batchPutAtomic(kvPairs, ttl)); + } + + @Override + public ByteString get(ByteString key) { + return callWithCircuitBreaker("get", () -> client.get(key)); + } + + @Override + public List batchGet(List keys) { + return callWithCircuitBreaker("batchGet", () -> client.batchGet(keys)); + } + + @Override + public void batchDelete(List keys) { + callWithCircuitBreaker("batchDelete", () -> client.batchDelete(keys)); + } + + @Override + public void batchDeleteAtomic(List keys) { + callWithCircuitBreaker("batchDeleteAtomic", () -> client.batchDeleteAtomic(keys)); + } + + @Override + public Long getKeyTTL(ByteString key) { + return callWithCircuitBreaker("getKeyTTL", () -> client.getKeyTTL(key)); + } + + @Override + public List> batchScan(List ranges) { + return callWithCircuitBreaker("batchScan", () -> client.batchScan(ranges)); + } + + @Override + public List scan(ByteString startKey, ByteString endKey, int limit) { + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit)); + } + + @Override + public List scan( + ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit, keyOnly)); + } + + @Override + public List scan(ByteString startKey, int limit) { + return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit)); + } + + @Override + public List scan(ByteString startKey, int limit, boolean keyOnly) { + return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit, keyOnly)); + } + + @Override + public List scan(ByteString startKey, ByteString endKey) { + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey)); + } + + @Override + public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, keyOnly)); + } + + @Override + public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) { + return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, limit, keyOnly)); + } + + @Override + public List scanPrefix(ByteString prefixKey) { + return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey)); + } + + @Override + public List scanPrefix(ByteString prefixKey, boolean keyOnly) { + return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, keyOnly)); + } + + @Override + public void delete(ByteString key) { + callWithCircuitBreaker("delete", () -> client.delete(key)); + } + + @Override + public void deleteAtomic(ByteString key) { + callWithCircuitBreaker("deleteAtomic", () -> client.deleteAtomic(key)); + } + + @Override + public void deleteRange(ByteString startKey, ByteString endKey) { + callWithCircuitBreaker("deleteRange", () -> client.deleteRange(startKey, endKey)); + } + + @Override + public void deletePrefix(ByteString key) { + callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key)); + } + + T callWithCircuitBreaker(String funcName, Function1 func) { + Histogram.Timer requestTimer = REQUEST_LATENCY.labels(funcName).startTimer(); + try { + T result = callWithCircuitBreaker0(funcName, func); + REQUEST_SUCCESS.labels(funcName).inc(); + return result; + } catch (Exception e) { + REQUEST_FAILURE.labels(funcName).inc(); + throw e; + } finally { + requestTimer.observeDuration(); + } + } + + private T callWithCircuitBreaker0(String funcName, Function1 func) { + if (circuitBreaker.allowRequest()) { + try { + T result = func.apply(); + circuitBreaker.getMetrics().recordSuccess(); + return result; + } catch (Exception e) { + circuitBreaker.getMetrics().recordFailure(); + throw e; + } + } else if (circuitBreaker.attemptExecution()) { + logger.debug("attemptExecution"); + try { + T result = func.apply(); + circuitBreaker.getMetrics().recordSuccess(); + circuitBreaker.recordAttemptSuccess(); + logger.debug("markSuccess"); + return result; + } catch (Exception e) { + circuitBreaker.getMetrics().recordFailure(); + circuitBreaker.recordAttemptFailure(); + logger.debug("markNonSuccess"); + throw e; + } + } else { + logger.debug("Circuit Breaker Opened"); + CIRCUIT_BREAKER_OPENED.labels(funcName).inc(); + throw new CircuitBreakerOpenException(); + } + } + + private void callWithCircuitBreaker(String funcName, Function0 func) { + callWithCircuitBreaker( + funcName, + (Function1) + () -> { + func.apply(); + return null; + }); + } + + public interface Function1 { + T apply(); + } + + public interface Function0 { + void apply(); + } +} diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java new file mode 100644 index 0000000000..e1c4f1e2bc --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +import java.io.Closeable; + +public interface CircuitBreaker extends Closeable { + + enum Status { + CLOSED(0), + HALF_OPEN(1), + OPEN(2); + + private final int value; + + private Status(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } + + /** + * Every requests asks this if it is allowed to proceed or not. It is idempotent and does not + * modify any internal state. + * + * @return boolean whether a request should be permitted + */ + boolean allowRequest(); + + /** + * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may + * modify internal state. + */ + boolean attemptExecution(); + + /** Invoked on successful executions as part of feedback mechanism when in a half-open state. */ + void recordAttemptSuccess(); + + /** Invoked on unsuccessful executions as part of feedback mechanism when in a half-open state. */ + void recordAttemptFailure(); + + /** Get the Circuit Breaker Metrics Object. */ + CircuitBreakerMetrics getMetrics(); +} diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java new file mode 100644 index 0000000000..7f2231bd7a --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -0,0 +1,202 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +import io.prometheus.client.Counter; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; + +public class CircuitBreakerImpl implements CircuitBreaker { + private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class); + + private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER = + Counter.build() + .name("client_java_circuit_breaker_attempt_counter") + .help("client circuit breaker attempt counter.") + .labelNames("type") + .register(); + + private final boolean enable; + private final int windowInSeconds; + private final int errorThresholdPercentage; + private final int requestVolumeThreshold; + private final int sleepWindowInSeconds; + private final int attemptRequestCount; + + private final AtomicLong circuitOpened = new AtomicLong(-1); + private final AtomicReference status = new AtomicReference<>(Status.CLOSED); + private final AtomicLong attemptCount = new AtomicLong(0); + private final AtomicLong attemptSuccessCount = new AtomicLong(0); + + private final CircuitBreakerMetrics metrics; + + public CircuitBreakerImpl(TiConfiguration conf) { + this( + conf.isCircuitBreakEnable(), + conf.getCircuitBreakAvailabilityWindowInSeconds(), + conf.getCircuitBreakAvailabilityErrorThresholdPercentage(), + conf.getCircuitBreakAvailabilityRequestVolumnThreshold(), + conf.getCircuitBreakSleepWindowInSeconds(), + conf.getCircuitBreakAttemptRequestCount()); + } + + public CircuitBreakerImpl( + boolean enable, + int windowInSeconds, + int errorThresholdPercentage, + int requestVolumeThreshold, + int sleepWindowInSeconds, + int attemptRequestCount) { + this.enable = enable; + this.windowInSeconds = windowInSeconds; + this.errorThresholdPercentage = errorThresholdPercentage; + this.requestVolumeThreshold = requestVolumeThreshold; + this.sleepWindowInSeconds = sleepWindowInSeconds; + this.attemptRequestCount = attemptRequestCount; + this.metrics = + enable ? new CircuitBreakerMetricsImpl(windowInSeconds) : new NoOpCircuitBreakerMetrics(); + this.metrics.addListener(getMetricsListener()); + } + + private MetricsListener getMetricsListener() { + return hc -> { + logger.debug("onNext " + hc.toString()); + // check if we are past the requestVolumeThreshold + if (hc.getTotalRequests() < requestVolumeThreshold) { + // we are not past the minimum volume threshold for the stat window, + // so no change to circuit status. + // if it was CLOSED, it stays CLOSED + // if it was half-open, we need to wait for some successful command executions + // if it was open, we need to wait for sleep window to elapse + } else { + if (hc.getErrorPercentage() < errorThresholdPercentage) { + // we are not past the minimum error threshold for the stat window, + // so no change to circuit status. + // if it was CLOSED, it stays CLOSED + // if it was half-open, we need to wait for some successful command executions + // if it was open, we need to wait for sleep window to elapse + } else { + // our failure rate is too high, we need to set the state to OPEN + close2Open(); + } + } + }; + } + + @Override + public CircuitBreakerMetrics getMetrics() { + return metrics; + } + + @Override + public boolean allowRequest() { + if (!enable) { + return true; + } + return !isOpen(); + } + + boolean isOpen() { + return circuitOpened.get() >= 0; + } + + Status getStatus() { + return status.get(); + } + + @Override + public void recordAttemptSuccess() { + CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success").inc(); + if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) { + halfOpen2Close(); + } + } + + @Override + public void recordAttemptFailure() { + CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure").inc(); + halfOpen2Open(); + } + + @Override + public boolean attemptExecution() { + if (allowRequest()) { + return true; + } else { + if (isAfterSleepWindow()) { + // only the `attemptRequestCount` requests after sleep window should execute + // if all the executing commands succeed, the status will transition to CLOSED + // if some of the executing commands fail, the status will transition to OPEN + open2HalfOpen(); + return attemptCount.incrementAndGet() <= attemptRequestCount; + } else { + return false; + } + } + } + + private boolean isAfterSleepWindow() { + final long circuitOpenTime = circuitOpened.get(); + final long currentTime = System.currentTimeMillis(); + final long sleepWindowTime = (long) sleepWindowInSeconds * 1000; + return currentTime >= circuitOpenTime + sleepWindowTime; + } + + private void close2Open() { + if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { + // This thread wins the race to open the circuit + // it sets the start time for the sleep window + circuitOpened.set(System.currentTimeMillis()); + logger.info("CLOSED => OPEN"); + } + } + + private void halfOpen2Close() { + if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { + // This thread wins the race to close the circuit + circuitOpened.set(-1L); + logger.info("HALF_OPEN => CLOSED"); + } + } + + private void open2HalfOpen() { + if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { + // This thread wins the race to half close the circuit + // it resets the attempt count + attemptCount.set(0); + attemptSuccessCount.set(0); + logger.info("OPEN => HALF_OPEN"); + } + } + + private void halfOpen2Open() { + if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { + // This thread wins the race to re-open the circuit + // it resets the start time for the sleep window + circuitOpened.set(System.currentTimeMillis()); + logger.info("HALF_OPEN => OPEN"); + } + } + + @Override + public void close() throws IOException { + metrics.close(); + } +} diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java new file mode 100644 index 0000000000..6287a9f199 --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +import java.io.Closeable; + +public interface CircuitBreakerMetrics extends Closeable { + /** Record a successful call. */ + void recordSuccess(); + + /** Record a failure call. */ + void recordFailure(); + + /** Add metrics listener. */ + void addListener(MetricsListener metricsListener); +} diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java new file mode 100644 index 0000000000..da497efcb7 --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -0,0 +1,120 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics { + private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class); + + private final int windowInMS; + private final List listeners; + private final AtomicReference currentMetrics; + + private final ScheduledExecutorService scheduler; + private static final int SCHEDULER_INITIAL_DELAY = 1000; + private static final int SCHEDULER_PERIOD = 1000; + + public CircuitBreakerMetricsImpl(int windowInSeconds) { + this.windowInMS = windowInSeconds * 1000; + this.listeners = new ArrayList<>(); + this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics()); + + scheduler = + new ScheduledThreadPoolExecutor( + 1, + new BasicThreadFactory.Builder() + .namingPattern("circuit-breaker-metrics-%d") + .daemon(true) + .build()); + + scheduler.scheduleAtFixedRate( + this::onReachCircuitWindow, + SCHEDULER_INITIAL_DELAY, + SCHEDULER_PERIOD, + TimeUnit.MILLISECONDS); + } + + @Override + public void recordSuccess() { + currentMetrics.get().recordSuccess(); + } + + @Override + public void recordFailure() { + currentMetrics.get().recordFailure(); + } + + private void onReachCircuitWindow() { + SingleWindowMetrics singleWindowMetrics = currentMetrics.get(); + if (System.currentTimeMillis() < singleWindowMetrics.getStartMS() + windowInMS) { + return; + } + if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) { + return; + } + logger.debug("window timeout, reset SingleWindowMetrics"); + HealthCounts healthCounts = singleWindowMetrics.getHealthCounts(); + for (MetricsListener metricsListener : listeners) { + metricsListener.onNext(healthCounts); + } + } + + @Override + public void addListener(MetricsListener metricsListener) { + listeners.add(metricsListener); + } + + @Override + public void close() throws IOException { + scheduler.shutdown(); + } + + /** Instead of using SingleWindowMetrics, it is better to use RollingWindowMetrics. */ + static class SingleWindowMetrics { + private final long startMS = System.currentTimeMillis(); + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); + + public void recordSuccess() { + totalCount.incrementAndGet(); + } + + public void recordFailure() { + totalCount.incrementAndGet(); + + errorCount.incrementAndGet(); + } + + public HealthCounts getHealthCounts() { + return new HealthCounts(totalCount.get(), errorCount.get()); + } + + public long getStartMS() { + return startMS; + } + } +} diff --git a/src/main/java/org/tikv/service/failsafe/HealthCounts.java b/src/main/java/org/tikv/service/failsafe/HealthCounts.java new file mode 100644 index 0000000000..68f986aa48 --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/HealthCounts.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +public class HealthCounts { + private final long totalCount; + private final long errorCount; + private final int errorPercentage; + + HealthCounts(long total, long error) { + this.totalCount = total; + this.errorCount = error; + if (totalCount > 0) { + this.errorPercentage = (int) ((double) errorCount / totalCount * 100); + } else { + this.errorPercentage = 0; + } + } + + public long getTotalRequests() { + return totalCount; + } + + public long getErrorCount() { + return errorCount; + } + + public int getErrorPercentage() { + return errorPercentage; + } + + @Override + public String toString() { + return "HealthCounts{" + + "totalCount=" + + totalCount + + ", errorCount=" + + errorCount + + ", errorPercentage=" + + errorPercentage + + '}'; + } +} diff --git a/src/main/java/org/tikv/service/failsafe/MetricsListener.java b/src/main/java/org/tikv/service/failsafe/MetricsListener.java new file mode 100644 index 0000000000..2fd59b4661 --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/MetricsListener.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +public interface MetricsListener { + void onNext(HealthCounts healthCounts); +} diff --git a/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java b/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java new file mode 100644 index 0000000000..e5474fb6e1 --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +import java.io.IOException; + +public class NoOpCircuitBreakerMetrics implements CircuitBreakerMetrics { + @Override + public void recordSuccess() { + // do nothing + } + + @Override + public void recordFailure() { + // do nothing + } + + @Override + public void addListener(MetricsListener metricsListener) { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } +} diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java new file mode 100644 index 0000000000..b4c0bc5588 --- /dev/null +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -0,0 +1,85 @@ +package org.tikv.raw; + +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.CircuitBreakerOpenException; + +public class SmartRawKVClientTest { + private boolean enable = true; + private int windowInSeconds = 2; + private int errorThresholdPercentage = 100; + private int requestVolumeThreshold = 10; + private int sleepWindowInSeconds = 1; + private int attemptRequestCount = 10; + + private int sleepDelta = 100; + + private TiSession session; + private SmartRawKVClient client; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createRawDefault(); + conf.setCircuitBreakEnable(enable); + conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds); + conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage); + conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold); + conf.setCircuitBreakSleepWindowInSeconds(sleepWindowInSeconds); + conf.setCircuitBreakAttemptRequestCount(attemptRequestCount); + session = TiSession.create(conf); + client = session.createSmartRawClient(); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void testCircuitBreaker() throws InterruptedException { + // CLOSED => OPEN + { + for (int i = 1; i <= requestVolumeThreshold; i++) { + error(); + } + Thread.sleep(windowInSeconds * 1000 + sleepDelta); + + Exception error = null; + try { + client.get(ByteString.copyFromUtf8("key")); + assertTrue(false); + } catch (Exception e) { + error = e; + } + assertTrue(error instanceof CircuitBreakerOpenException); + } + + // OPEN => CLOSED + { + Thread.sleep(sleepWindowInSeconds * 1000); + for (int i = 1; i <= attemptRequestCount; i++) { + success(); + } + client.get(ByteString.copyFromUtf8("key")); + } + } + + private void success() { + client.get(ByteString.copyFromUtf8("key")); + } + + private void error() { + try { + client.callWithCircuitBreaker("error", () -> 1 / 0); + } catch (Exception ignored) { + } + } +} diff --git a/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java new file mode 100644 index 0000000000..a8cbc3576f --- /dev/null +++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java @@ -0,0 +1,69 @@ +package org.tikv.service.failsafe; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; + +public class CircuitBreakerMetricsTest { + private static final int TEST_COUNT = 10; + private static final int WINDOW_IN_SECONDS = 1; + private static final int SLEEP_DELTA = 100; + + @Test + public void testAllSuccess() throws InterruptedException, IOException { + CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); + + AtomicReference healthCounts = new AtomicReference<>(); + MetricsListener metricsListener = healthCounts::set; + metrics.addListener(metricsListener); + + for (int i = 1; i <= TEST_COUNT; i++) { + metrics.recordSuccess(); + } + Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); + assertNotNull(healthCounts.get()); + assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT); + assertEquals(healthCounts.get().getErrorPercentage(), 0); + metrics.close(); + } + + @Test + public void testAllFailure() throws InterruptedException, IOException { + CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); + + AtomicReference healthCounts = new AtomicReference<>(); + MetricsListener metricsListener = healthCounts::set; + metrics.addListener(metricsListener); + + for (int i = 1; i <= TEST_COUNT; i++) { + metrics.recordFailure(); + } + Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); + assertNotNull(healthCounts.get()); + assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT); + assertEquals(healthCounts.get().getErrorPercentage(), 100); + metrics.close(); + } + + @Test + public void testHalfFailure() throws InterruptedException, IOException { + CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); + + AtomicReference healthCounts = new AtomicReference<>(); + MetricsListener metricsListener = healthCounts::set; + metrics.addListener(metricsListener); + + for (int i = 1; i <= TEST_COUNT; i++) { + metrics.recordFailure(); + metrics.recordSuccess(); + } + Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); + assertNotNull(healthCounts.get()); + assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT * 2); + assertEquals(healthCounts.get().getErrorPercentage(), 50); + metrics.close(); + } +} diff --git a/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java new file mode 100644 index 0000000000..766d5bff70 --- /dev/null +++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java @@ -0,0 +1,69 @@ +package org.tikv.service.failsafe; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class CircuitBreakerTest { + + @Test + public void testCircuitBreaker() throws InterruptedException { + boolean enable = true; + int windowInSeconds = 2; + int errorThresholdPercentage = 100; + int requestVolumeThreshold = 10; + int sleepWindowInSeconds = 1; + int attemptRequestCount = 10; + + int sleepDelta = 100; + + CircuitBreakerImpl circuitBreaker = + new CircuitBreakerImpl( + enable, + windowInSeconds, + errorThresholdPercentage, + requestVolumeThreshold, + sleepWindowInSeconds, + attemptRequestCount); + CircuitBreakerMetrics metrics = circuitBreaker.getMetrics(); + + // initial state: CLOSE + assertTrue(!circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED); + + // CLOSE => OPEN + for (int i = 1; i <= requestVolumeThreshold; i++) { + metrics.recordFailure(); + } + Thread.sleep(windowInSeconds * 1000 + sleepDelta); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); + + // OPEN => HALF_OPEN + Thread.sleep(sleepWindowInSeconds * 1000); + assertTrue(circuitBreaker.attemptExecution()); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN); + + // HALF_OPEN => OPEN + circuitBreaker.recordAttemptFailure(); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); + + // OPEN => HALF_OPEN + Thread.sleep(sleepWindowInSeconds * 1000 + sleepDelta); + assertTrue(circuitBreaker.attemptExecution()); + circuitBreaker.recordAttemptSuccess(); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN); + + // HALF_OPEN => CLOSED + for (int i = 1; i < attemptRequestCount; i++) { + assertTrue(circuitBreaker.attemptExecution()); + circuitBreaker.recordAttemptSuccess(); + } + assertTrue(!circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED); + } +}