mirror of https://github.com/tikv/client-go.git
precompute min safe ts (#622)
Signed-off-by: hihihuhu <dingc05@gmail.com>
This commit is contained in:
parent
5dc09b15e7
commit
50651d6bb5
|
|
@ -2367,6 +2367,16 @@ func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (r
|
|||
return res
|
||||
}
|
||||
|
||||
// GetLabelValue returns the value of the label
|
||||
func (s *Store) GetLabelValue(key string) (string, bool) {
|
||||
for _, label := range s.labels {
|
||||
if label.Key == key {
|
||||
return label.Value, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// getLivenessState gets the cached liveness state of the store.
|
||||
// When it's not reachable, a goroutine will update the state in background.
|
||||
// To get the accurate liveness state, use checkLiveness instead.
|
||||
|
|
|
|||
51
tikv/kv.go
51
tikv/kv.go
|
|
@ -47,7 +47,6 @@ import (
|
|||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
|
|
@ -122,6 +121,9 @@ type KVStore struct {
|
|||
// it indicates the safe timestamp point that can be used to read consistent but may not the latest data.
|
||||
safeTSMap sync.Map
|
||||
|
||||
// MinSafeTs stores the minimum ts value for each txnScope
|
||||
minSafeTS sync.Map
|
||||
|
||||
replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled
|
||||
|
||||
ctx context.Context
|
||||
|
|
@ -438,23 +440,10 @@ func (s *KVStore) GetTiKVClient() (client Client) {
|
|||
|
||||
// GetMinSafeTS return the minimal safeTS of the storage with given txnScope.
|
||||
func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
|
||||
stores := make([]*locate.Store, 0)
|
||||
allStores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
|
||||
if txnScope != oracle.GlobalTxnScope {
|
||||
for _, store := range allStores {
|
||||
if store.IsLabelsMatch([]*metapb.StoreLabel{
|
||||
{
|
||||
Key: DCLabelKey,
|
||||
Value: txnScope,
|
||||
},
|
||||
}) {
|
||||
stores = append(stores, store)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stores = allStores
|
||||
if val, ok := s.minSafeTS.Load(txnScope); ok {
|
||||
return val.(uint64)
|
||||
}
|
||||
return s.getMinSafeTSByStores(stores)
|
||||
return 0
|
||||
}
|
||||
|
||||
// Ctx returns ctx.
|
||||
|
|
@ -495,18 +484,14 @@ func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
|
|||
s.safeTSMap.Store(storeID, safeTS)
|
||||
}
|
||||
|
||||
func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 {
|
||||
if val, err := util.EvalFailpoint("injectSafeTS"); err == nil {
|
||||
injectTS := val.(int)
|
||||
return uint64(injectTS)
|
||||
}
|
||||
func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
|
||||
minSafeTS := uint64(math.MaxUint64)
|
||||
// when there is no store, return 0 in order to let minStartTS become startTS directly
|
||||
if len(stores) < 1 {
|
||||
return 0
|
||||
if len(storeIDs) < 1 {
|
||||
s.minSafeTS.Store(txnScope, 0)
|
||||
}
|
||||
for _, store := range stores {
|
||||
ok, safeTS := s.getSafeTS(store.StoreID())
|
||||
for _, store := range storeIDs {
|
||||
ok, safeTS := s.getSafeTS(store)
|
||||
if ok {
|
||||
if safeTS != 0 && safeTS < minSafeTS {
|
||||
minSafeTS = safeTS
|
||||
|
|
@ -515,7 +500,7 @@ func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 {
|
|||
minSafeTS = 0
|
||||
}
|
||||
}
|
||||
return minSafeTS
|
||||
s.minSafeTS.Store(txnScope, minSafeTS)
|
||||
}
|
||||
|
||||
func (s *KVStore) safeTSUpdater() {
|
||||
|
|
@ -571,6 +556,18 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
|||
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
|
||||
}(ctx, wg, storeID, storeAddr)
|
||||
}
|
||||
|
||||
txnScopeMap := make(map[string][]uint64)
|
||||
for _, store := range stores {
|
||||
txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID())
|
||||
|
||||
if label, ok := store.GetLabelValue(DCLabelKey); ok {
|
||||
txnScopeMap[label] = append(txnScopeMap[label], store.StoreID())
|
||||
}
|
||||
}
|
||||
for txnScope, storeIDs := range txnScopeMap {
|
||||
s.updateMinSafeTS(txnScope, storeIDs)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue