From 50651d6bb50cc78b5fa7c64fa9c11c39c780f749 Mon Sep 17 00:00:00 2001 From: Chen Ding Date: Thu, 1 Dec 2022 19:55:43 -0800 Subject: [PATCH] precompute min safe ts (#622) Signed-off-by: hihihuhu --- internal/locate/region_cache.go | 10 +++++++ tikv/kv.go | 51 ++++++++++++++++----------------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 5a353a81..154def11 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2367,6 +2367,16 @@ func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (r return res } +// GetLabelValue returns the value of the label +func (s *Store) GetLabelValue(key string) (string, bool) { + for _, label := range s.labels { + if label.Key == key { + return label.Value, true + } + } + return "", false +} + // getLivenessState gets the cached liveness state of the store. // When it's not reachable, a goroutine will update the state in background. // To get the accurate liveness state, use checkLiveness instead. diff --git a/tikv/kv.go b/tikv/kv.go index 3e45e15b..694a127b 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -47,7 +47,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" @@ -122,6 +121,9 @@ type KVStore struct { // it indicates the safe timestamp point that can be used to read consistent but may not the latest data. safeTSMap sync.Map + // MinSafeTs stores the minimum ts value for each txnScope + minSafeTS sync.Map + replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled ctx context.Context @@ -438,23 +440,10 @@ func (s *KVStore) GetTiKVClient() (client Client) { // GetMinSafeTS return the minimal safeTS of the storage with given txnScope. func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { - stores := make([]*locate.Store, 0) - allStores := s.regionCache.GetStoresByType(tikvrpc.TiKV) - if txnScope != oracle.GlobalTxnScope { - for _, store := range allStores { - if store.IsLabelsMatch([]*metapb.StoreLabel{ - { - Key: DCLabelKey, - Value: txnScope, - }, - }) { - stores = append(stores, store) - } - } - } else { - stores = allStores + if val, ok := s.minSafeTS.Load(txnScope); ok { + return val.(uint64) } - return s.getMinSafeTSByStores(stores) + return 0 } // Ctx returns ctx. @@ -495,18 +484,14 @@ func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } -func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 { - if val, err := util.EvalFailpoint("injectSafeTS"); err == nil { - injectTS := val.(int) - return uint64(injectTS) - } +func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { minSafeTS := uint64(math.MaxUint64) // when there is no store, return 0 in order to let minStartTS become startTS directly - if len(stores) < 1 { - return 0 + if len(storeIDs) < 1 { + s.minSafeTS.Store(txnScope, 0) } - for _, store := range stores { - ok, safeTS := s.getSafeTS(store.StoreID()) + for _, store := range storeIDs { + ok, safeTS := s.getSafeTS(store) if ok { if safeTS != 0 && safeTS < minSafeTS { minSafeTS = safeTS @@ -515,7 +500,7 @@ func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 { minSafeTS = 0 } } - return minSafeTS + s.minSafeTS.Store(txnScope, minSafeTS) } func (s *KVStore) safeTSUpdater() { @@ -571,6 +556,18 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds()) }(ctx, wg, storeID, storeAddr) } + + txnScopeMap := make(map[string][]uint64) + for _, store := range stores { + txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID()) + + if label, ok := store.GetLabelValue(DCLabelKey); ok { + txnScopeMap[label] = append(txnScopeMap[label], store.StoreID()) + } + } + for txnScope, storeIDs := range txnScopeMap { + s.updateMinSafeTS(txnScope, storeIDs) + } wg.Wait() }