From 5d0ae57f2234c3aa38955cd5044ecb75cf584473 Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 22 Jan 2024 21:39:56 +0800 Subject: [PATCH 1/3] region_cache: restrict access to store related fields (#1106) * region_cache: restrict access to store related fields Signed-off-by: zyguan * fix a typo Signed-off-by: zyguan * resolve a todo Signed-off-by: zyguan * resolve an another todo Signed-off-by: zyguan * limit the use of store api Signed-off-by: zyguan --------- Signed-off-by: zyguan Co-authored-by: cfzjywxk --- internal/locate/region_cache.go | 317 ++++++++++--------- internal/locate/region_cache_test.go | 28 +- internal/locate/region_request.go | 11 +- internal/locate/region_request3_test.go | 44 +-- internal/locate/region_request_state_test.go | 7 +- internal/locate/region_request_test.go | 5 +- 6 files changed, 198 insertions(+), 214 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index a4f40ef8..3bde35c3 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -287,11 +287,9 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio var leaderAccessIdx AccessIndex availablePeers := r.meta.GetPeers()[:0] for _, p := range r.meta.Peers { - c.storeMu.RLock() - store, exists := c.storeMu.stores[p.StoreId] - c.storeMu.RUnlock() + store, exists := c.getStore(p.StoreId) if !exists { - store = c.getStoreByStoreID(p.StoreId) + store = c.getStoreOrInsertDefault(p.StoreId) } addr, err := store.initResolve(bo, c) if err != nil { @@ -334,11 +332,9 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio } for _, p := range pdRegion.DownPeers { - c.storeMu.RLock() - store, exists := c.storeMu.stores[p.StoreId] - c.storeMu.RUnlock() + store, exists := c.getStore(p.StoreId) if !exists { - store = c.getStoreByStoreID(p.StoreId) + store = c.getStoreOrInsertDefault(p.StoreId) } addr, err := store.initResolve(bo, c) if err != nil { @@ -468,7 +464,7 @@ func (mu *regionIndexMu) refresh(r []*Region) { mu.sorted = newMu.sorted } -type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState +type livenessFunc func(ctx context.Context, s *Store) livenessState // RegionCache caches Regions loaded from PD. // All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing @@ -564,9 +560,7 @@ func newTestRegionCache() *RegionCache { // clear clears all cached data in the RegionCache. It's only used in tests. func (c *RegionCache) clear() { c.mu = *newRegionIndexMu(nil) - c.storeMu.Lock() - c.storeMu.stores = make(map[uint64]*Store) - c.storeMu.Unlock() + c.clearStores() } // thread unsafe, should use with lock @@ -596,7 +590,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { select { case <-c.ctx.Done(): return - case <-c.notifyCheckCh: + case <-c.getCheckStoreEvents(): c.checkAndResolve(needCheckStores, func(s *Store) bool { return s.getResolveState() == needCheck }) @@ -639,15 +633,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* } }() - c.storeMu.RLock() - for _, store := range c.storeMu.stores { - if needCheck(store) { - needCheckStores = append(needCheckStores, store) - } - } - c.storeMu.RUnlock() - - for _, store := range needCheckStores { + for _, store := range c.filterStores(needCheckStores, needCheck) { _, err := store.reResolve(c) tikverr.Log(err) } @@ -655,16 +641,14 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* // SetRegionCacheStore is used to set a store in region cache, for testing only func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { - c.storeMu.Lock() - defer c.storeMu.Unlock() - c.storeMu.stores[id] = &Store{ + c.putStore(&Store{ storeID: id, storeType: storeType, state: state, labels: labels, addr: addr, peerAddr: peerAddr, - } + }) } // SetPDClient replaces pd client,for testing only @@ -1224,7 +1208,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() } // schedule a store addr resolve. - s.markNeedCheck(c.notifyCheckCh) + c.markStoreNeedCheck(s) return incEpochStoreIdx } @@ -1625,41 +1609,17 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { } // GetStoresByType gets stores by type `typ` -// TODO: revise it by get store by closure. func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store { - c.storeMu.Lock() - defer c.storeMu.Unlock() - stores := make([]*Store, 0) - for _, store := range c.storeMu.stores { - if store.getResolveState() != resolved { - continue - } - if store.storeType == typ { - //TODO: revise it with store.clone() - storeLabel := make([]*metapb.StoreLabel, 0) - for _, label := range store.labels { - storeLabel = append(storeLabel, &metapb.StoreLabel{ - Key: label.Key, - Value: label.Value, - }) - } - stores = append(stores, &Store{ - addr: store.addr, - peerAddr: store.peerAddr, - storeID: store.storeID, - labels: storeLabel, - storeType: typ, - }) - } - } - return stores + return c.filterStores(nil, func(s *Store) bool { + return s.getResolveState() == resolved && s.storeType == typ + }) } // GetAllStores gets TiKV and TiFlash stores. func (c *RegionCache) GetAllStores() []*Store { - stores := c.GetStoresByType(tikvrpc.TiKV) - tiflashStores := c.GetStoresByType(tikvrpc.TiFlash) - return append(stores, tiflashStores...) + return c.filterStores(nil, func(s *Store) bool { + return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash) + }) } func filterUnavailablePeers(region *pd.Region) { @@ -1927,7 +1887,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S addr, err = store.initResolve(bo, c) return case deleted: - addr = c.changeToActiveStore(region, store) + addr = c.changeToActiveStore(region, store.storeID) return case tombstone: return "", nil @@ -1980,10 +1940,8 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor // changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map. // The order is guaranteed by reResolve() which adds the new store before marking old store deleted. -func (c *RegionCache) changeToActiveStore(region *Region, store *Store) (addr string) { - c.storeMu.RLock() - store = c.storeMu.stores[store.storeID] - c.storeMu.RUnlock() +func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) { + store, _ := c.getStore(storeID) for { oldRegionStore := region.getStore() newRegionStore := oldRegionStore.clone() @@ -2003,32 +1961,6 @@ func (c *RegionCache) changeToActiveStore(region *Region, store *Store) (addr st return } -func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) { - var ok bool - c.storeMu.Lock() - store, ok = c.storeMu.stores[storeID] - if ok { - c.storeMu.Unlock() - return - } - store = &Store{storeID: storeID} - c.storeMu.stores[storeID] = store - c.storeMu.Unlock() - return -} - -func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store { - c.storeMu.RLock() - defer c.storeMu.RUnlock() - s := make([]*Store, 0) - for _, store := range c.storeMu.stores { - if store.IsLabelsMatch(labels) { - s = append(s, store) - } - } - return s -} - // OnBucketVersionNotMatch removes the old buckets meta if the version is stale. func (c *RegionCache) OnBucketVersionNotMatch(ctx *RPCContext, version uint64, keys [][]byte) { r := c.GetCachedRegionWithRLock(ctx.Region) @@ -2118,35 +2050,27 @@ func (c *RegionCache) PDClient() pd.Client { // GetTiFlashStores returns the information of all tiflash nodes. func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store { - c.storeMu.RLock() - defer c.storeMu.RUnlock() - var stores []*Store - for _, s := range c.storeMu.stores { - if s.storeType == tikvrpc.TiFlash { - if !labelFilter(s.labels) { - continue - } - stores = append(stores, s) - } - } - return stores + return c.filterStores(nil, func(s *Store) bool { + return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) + }) } // GetTiFlashComputeStores returns all stores with lable . func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) { - c.tiflashComputeStoreMu.RLock() - needReload := c.tiflashComputeStoreMu.needReload - stores := c.tiflashComputeStoreMu.stores - c.tiflashComputeStoreMu.RUnlock() + stores, needReload := c.listTiflashComputeStores() if needReload { - return c.reloadTiFlashComputeStores(bo) + stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c) + if err == nil { + c.setTiflashComputeStores(stores) + } + return stores, err } return stores, nil } -func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, _ error) { - stores, err := c.pdClient.GetAllStores(bo.GetCtx()) +func reloadTiFlashComputeStores(ctx context.Context, registry storeRegistry) (res []*Store, _ error) { + stores, err := registry.fetchAllStores(ctx) if err != nil { return nil, err } @@ -2163,10 +2087,6 @@ func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*St }) } } - - c.tiflashComputeStoreMu.Lock() - c.tiflashComputeStoreMu.stores = res - c.tiflashComputeStoreMu.Unlock() return res, nil } @@ -2192,9 +2112,7 @@ func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool // InvalidateTiFlashComputeStores set needReload be true, // and will refresh tiflash_compute store cache next time. func (c *RegionCache) InvalidateTiFlashComputeStores() { - c.tiflashComputeStoreMu.Lock() - defer c.tiflashComputeStoreMu.Unlock() - c.tiflashComputeStoreMu.needReload = true + c.markTiflashComputeStoresNeedReload() } // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if @@ -2624,7 +2542,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, e var store *metapb.Store for { start := time.Now() - store, err = c.pdClient.GetStore(bo.GetCtx(), s.storeID) + store, err = c.fetchStore(bo.GetCtx(), s.storeID) metrics.LoadRegionCacheHistogramWithGetStore.Observe(time.Since(start).Seconds()) if err != nil { metrics.RegionCacheCounterWithGetStoreError.Inc() @@ -2672,7 +2590,7 @@ func isStoreNotFoundError(err error) bool { // deleted. func (s *Store) reResolve(c *RegionCache) (bool, error) { var addr string - store, err := c.pdClient.GetStore(context.Background(), s.storeID) + store, err := c.fetchStore(context.Background(), s.storeID) if err != nil { metrics.RegionCacheCounterWithGetStoreError.Inc() } else { @@ -2700,12 +2618,10 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} - c.storeMu.Lock() if s.addr == addr { newStore.slowScore = s.slowScore } - c.storeMu.stores[newStore.storeID] = newStore - c.storeMu.Unlock() + c.putStore(newStore) s.setResolveState(deleted) return false, nil } @@ -2749,16 +2665,6 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool { } } -// markNeedCheck marks resolved store to be async resolve to check store addr change. -func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) { - if s.changeResolveStateTo(resolved, needCheck) { - select { - case notifyCheckCh <- struct{}{}: - default: - } - } -} - // IsSameLabels returns whether the store have the same labels with target labels func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool { if len(s.labels) != len(labels) { @@ -2822,6 +2728,10 @@ func (s *Store) getLivenessState() livenessState { type livenessState uint32 +func (l livenessState) injectConstantLiveness(tk testingKnobs) { + tk.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { return l }) +} + var ( livenessSf singleflight.Group // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. @@ -2858,7 +2768,11 @@ func (s livenessState) String() string { } } -func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { +func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) { + liveness = s.requestLiveness(bo.GetCtx(), c) + if liveness == reachable { + return + } // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store", @@ -2871,6 +2785,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt s.unreachableSince = time.Now() go s.checkUntilHealth(c) } + return } func (s *Store) checkUntilHealth(c *RegionCache) { @@ -2897,8 +2812,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) { } } - bo := retry.NewNoopBackoff(c.ctx) - l := s.requestLiveness(bo, c) + l := s.requestLiveness(c.ctx, c) if l == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) @@ -2909,7 +2823,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) { } } -func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) { +func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) { // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. if val, err := util.EvalFailpoint("injectLiveness"); err == nil { switch val.(string) { @@ -2922,10 +2836,10 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness } } - if c != nil { - livenessFunc := c.testingKnobs.mockRequestLiveness.Load() + if tk != nil { + livenessFunc := tk.getMockRequestLiveness() if livenessFunc != nil { - return (*livenessFunc)(s, bo) + return livenessFunc(ctx, s) } } @@ -2941,12 +2855,6 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness rsCh := livenessSf.DoChan(addr, func() (interface{}, error) { return invokeKVStatusAPI(addr, storeLivenessTimeout), nil }) - var ctx context.Context - if bo != nil { - ctx = bo.GetCtx() - } else { - ctx = context.Background() - } select { case rs := <-rsCh: l = rs.Val.(livenessState) @@ -3083,12 +2991,10 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() { } }() slowScoreMetrics := make(map[string]float64) - c.storeMu.RLock() - for _, store := range c.storeMu.stores { + c.forEachStore(func(store *Store) { store.updateSlowScoreStat() slowScoreMetrics[store.addr] = float64(store.getSlowScore()) - } - c.storeMu.RUnlock() + }) for store, score := range slowScoreMetrics { metrics.TiKVStoreSlowScoreGauge.WithLabelValues(store).Set(score) } @@ -3118,14 +3024,12 @@ func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { case <-c.ctx.Done(): return case <-ticker.C: - c.storeMu.RLock() - for _, store := range c.storeMu.stores { + c.forEachStore(func(store *Store) { for destType := toLeader; destType < numReplicaFlowsType; destType++ { metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) store.resetReplicaFlowsStats(destType) } - } - c.storeMu.RUnlock() + }) } } } @@ -3184,3 +3088,116 @@ func contains(startKey, endKey, key []byte) bool { return bytes.Compare(startKey, key) <= 0 && (bytes.Compare(key, endKey) < 0 || len(endKey) == 0) } + +type testingKnobs interface { + getMockRequestLiveness() livenessFunc + setMockRequestLiveness(f livenessFunc) +} + +func (c *RegionCache) getMockRequestLiveness() livenessFunc { + f := c.testingKnobs.mockRequestLiveness.Load() + if f == nil { + return nil + } + return *f +} + +func (c *RegionCache) setMockRequestLiveness(f livenessFunc) { + c.testingKnobs.mockRequestLiveness.Store(&f) +} + +type storeRegistry interface { + fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) + fetchAllStores(ctx context.Context) ([]*metapb.Store, error) +} + +func (c *RegionCache) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) { + return c.pdClient.GetStore(ctx, id) +} + +func (c *RegionCache) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) { + return c.pdClient.GetAllStores(ctx) +} + +func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) { + c.storeMu.RLock() + store, exists = c.storeMu.stores[id] + c.storeMu.RUnlock() + return +} + +func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { + c.storeMu.Lock() + store, exists := c.storeMu.stores[id] + if !exists { + store = &Store{storeID: id} + c.storeMu.stores[id] = store + } + c.storeMu.Unlock() + return store +} + +func (c *RegionCache) putStore(store *Store) { + c.storeMu.Lock() + c.storeMu.stores[store.storeID] = store + c.storeMu.Unlock() +} + +func (c *RegionCache) clearStores() { + c.storeMu.Lock() + c.storeMu.stores = make(map[uint64]*Store) + c.storeMu.Unlock() +} + +func (c *RegionCache) forEachStore(f func(*Store)) { + c.storeMu.RLock() + defer c.storeMu.RUnlock() + for _, s := range c.storeMu.stores { + f(s) + } +} + +func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []*Store { + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + if predicate == nil || predicate(store) { + dst = append(dst, store) + } + } + c.storeMu.RUnlock() + return dst +} + +func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) { + c.tiflashComputeStoreMu.RLock() + needReload = c.tiflashComputeStoreMu.needReload + stores = c.tiflashComputeStoreMu.stores + c.tiflashComputeStoreMu.RUnlock() + return +} + +func (c *RegionCache) setTiflashComputeStores(stores []*Store) { + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.stores = stores + c.tiflashComputeStoreMu.needReload = false + c.tiflashComputeStoreMu.Unlock() +} + +func (c *RegionCache) markTiflashComputeStoresNeedReload() { + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.Unlock() +} + +func (c *RegionCache) markStoreNeedCheck(store *Store) { + if store.changeResolveStateTo(resolved, needCheck) { + select { + case c.notifyCheckCh <- struct{}{}: + default: + } + } +} + +func (c *RegionCache) getCheckStoreEvents() <-chan struct{} { + return c.notifyCheckCh +} diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index c84c4e77..440e624c 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -170,7 +170,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() { } for _, testcase := range testcases { s.T().Log(testcase.storeID) - store := s.cache.getStoreByStoreID(testcase.storeID) + store := s.cache.getStoreOrInsertDefault(testcase.storeID) _, err := store.initResolve(s.bo, s.cache) s.Nil(err) labels := []*metapb.StoreLabel{ @@ -179,7 +179,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() { Value: fmt.Sprintf("%v", testcase.storeID), }, } - stores := s.cache.getStoresByLabels(labels) + stores := s.cache.filterStores(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) }) s.Equal(len(stores), 1) s.Equal(stores[0].labels, labels) } @@ -209,7 +209,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Check resolving normal stores. The resolve state should be resolved. for _, storeMeta := range s.cluster.GetAllStores() { - store := cache.getStoreByStoreID(storeMeta.GetId()) + store := cache.getStoreOrInsertDefault(storeMeta.GetId()) s.Equal(store.getResolveState(), unresolved) addr, err := store.initResolve(bo, cache) s.Nil(err) @@ -227,26 +227,26 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { } // Mark the store needCheck. The resolve state should be resolved soon. - store := cache.getStoreByStoreID(s.store1) - store.markNeedCheck(cache.notifyCheckCh) + store := cache.getStoreOrInsertDefault(s.store1) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), resolved) // Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone. s.cluster.MarkTombstone(s.store1) - store.markNeedCheck(cache.notifyCheckCh) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), tombstone) s.cluster.StartStore(s.store1) // Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone. cache.clear() - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) store.initResolve(bo, cache) s.Equal(store.getResolveState(), resolved) storeMeta := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - store.markNeedCheck(cache.notifyCheckCh) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), tombstone) s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) @@ -254,14 +254,14 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Mark the store needCheck and its address and labels are changed. // The resolve state should be deleted and a new store is added to the cache. cache.clear() - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) store.initResolve(bo, cache) s.Equal(store.getResolveState(), resolved) s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) - store.markNeedCheck(cache.notifyCheckCh) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), deleted) - newStore := cache.getStoreByStoreID(s.store1) + newStore := cache.getStoreOrInsertDefault(s.store1) s.Equal(newStore.getResolveState(), resolved) s.Equal(newStore.addr, store.addr+"0") s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}}) @@ -269,7 +269,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Check initResolve()ing a tombstone store. The resolve state should be tombstone. cache.clear() s.cluster.MarkTombstone(s.store1) - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) for i := 0; i < 2; i++ { addr, err := store.initResolve(bo, cache) s.Nil(err) @@ -283,7 +283,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { cache.clear() storeMeta = s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) for i := 0; i < 2; i++ { addr, err := store.initResolve(bo, cache) s.Nil(err) @@ -1542,7 +1542,7 @@ func (s *testRegionCacheSuite) TestBuckets() { newMeta := proto.Clone(cachedRegion.meta).(*metapb.Region) newMeta.RegionEpoch.Version++ newMeta.RegionEpoch.ConfVer++ - _, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreByStoreID(s.store1)}, []*metapb.Region{newMeta}) + _, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreOrInsertDefault(s.store1)}, []*metapb.Region{newMeta}) s.Nil(err) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 89bd1a2c..4ddd1316 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1145,12 +1145,7 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo } func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { - store := accessReplica.store - liveness := store.requestLiveness(bo, s.regionCache) - if liveness != reachable { - store.startHealthCheckLoopIfNeeded(s.regionCache, liveness) - } - return liveness + return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache) } func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) { @@ -1164,7 +1159,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) ) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() // schedule a store addr resolve. - store.markNeedCheck(s.regionCache.notifyCheckCh) + s.regionCache.markStoreNeedCheck(store) store.markAlreadySlow() } } @@ -2197,7 +2192,7 @@ func (s *RegionRequestSender) onRegionError( zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx), ) - ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh) + s.regionCache.markStoreNeedCheck(ctx.Store) s.regionCache.InvalidateCachedRegion(ctx.Region) // It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment, // so we always reconnect in this case. diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e4e08651..9c45e47b 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -102,7 +102,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { s.NotNil(region) oldStoreLimit := kv.StoreLimit.Load() kv.StoreLimit.Store(500) - s.cache.getStoreByStoreID(s.storeIDs[0]).tokenCount.Store(500) + s.cache.getStoreOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500) // cause there is only one region in this cluster, regionID maps this leader. resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.NotNil(err) @@ -247,13 +247,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { return innerClient.SendRequest(ctx, addr, req, timeout) }} var storeState = uint32(unreachable) - tf := func(s *Store, bo *retry.Backoffer) livenessState { + s.regionRequestSender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { if s.addr == leaderAddr { return livenessState(atomic.LoadUint32(&storeState)) } return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + }) loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k")) s.Nil(err) @@ -521,10 +520,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return unreachable - } - cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + unreachable.injectConstantLiveness(cache) s.IsType(&accessKnownLeader{}, replicaSelector.state) _, err = replicaSelector.next(s.bo) s.Nil(err) @@ -560,11 +556,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Do not try to use proxy if livenessState is unknown instead of unreachable. refreshEpochs(regionStore) cache.enableForwarding = true - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return unknown - } - cache.testingKnobs.mockRequestLiveness.Store( - (*livenessFunc)(&tf)) + unknown.injectConstantLiveness(cache) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) @@ -586,10 +578,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return unreachable - } - cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + unreachable.injectConstantLiveness(cache) s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable }, 3*time.Second, 200*time.Millisecond) @@ -849,11 +838,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) // The leader store is alive but can't provide service. - - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(s.cache) s.Eventually(func() bool { stores := s.regionRequestSender.replicaSelector.regionStore.stores return stores[0].getLivenessState() == reachable && @@ -979,10 +964,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { } // Runs out of all replicas and then returns a send error. - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return unreachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + unreachable.injectConstantLiveness(s.cache) reloadRegion() for _, store := range s.storeIDs { s.cluster.StopStore(store) @@ -999,10 +981,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Verify switch to the leader immediately when stale read requests with global txn scope meet region errors. s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0]) - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(s.cache) s.Eventually(func() bool { stores := s.regionRequestSender.replicaSelector.regionStore.stores return stores[0].getLivenessState() == reachable && @@ -1367,10 +1346,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { } // Test for write request. - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(s.cache) resetStats() req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) req.ReplicaReadType = kv.ReplicaReadLeader diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index f012452b..3636355e 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -84,7 +84,7 @@ func (s *testRegionCacheStaleReadSuite) SetupTest() { } func (s *testRegionCacheStaleReadSuite) TearDownTest() { - s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(nil)) + s.cache.setMockRequestLiveness(nil) s.cache.Close() s.mvccStore.Close() } @@ -222,14 +222,13 @@ func (s *testRegionCacheStaleReadSuite) setClient() { return }} - tf := func(store *Store, bo *retry.Backoffer) livenessState { + s.cache.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState { _, ok := s.injection.unavailableStoreIDs[store.storeID] if ok { return unreachable } return reachable - } - s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + }) } func (s *testRegionCacheStaleReadSuite) extractResp(resp *tikvrpc.Response) (uint64, string, SuccessReadType) { diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 53f9fe1c..b79689fb 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -748,9 +748,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } defer func() { rpcClient.Close() @@ -775,7 +772,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { }() req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) regionRequestSender := NewRegionRequestSender(s.cache, fnClient) - regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(regionRequestSender.regionCache) regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) } }() From fd2fc84032b14e70bec4e6d6a26626b055b91650 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 23 Jan 2024 11:24:15 +0800 Subject: [PATCH 2/3] use slices.Sort to eliminate bounds check (#1128) Signed-off-by: Weizhen Wang --- .github/workflows/test.yml | 8 ++++---- .golangci.yml | 2 +- internal/mockstore/mocktikv/cluster.go | 5 +++-- internal/unionstore/memdb.go | 9 ++------- txnkv/transaction/txn.go | 5 +++-- util/misc.go | 7 +------ 6 files changed, 14 insertions(+), 22 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 50c49097..1dc50b8b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.21.0 + go-version: 1.21.6 - name: Test run: go test ./... @@ -28,7 +28,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.21.0 + go-version: 1.21.6 - name: Test with race run: go test -race ./... @@ -42,10 +42,10 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.21.0 + go-version: 1.21.6 - name: Lint uses: golangci/golangci-lint-action@v3 with: - version: v1.51.2 + version: v1.55.2 diff --git a/.golangci.yml b/.golangci.yml index 2929ea46..371fb92d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,7 +4,7 @@ linters: disable-all: true enable: - bodyclose - - depguard + #- depguard - exportloopref - gofmt - goimports diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index 7b5f1cc3..78b59fe6 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -38,6 +38,7 @@ import ( "bytes" "context" "math" + "slices" "sort" "sync" "time" @@ -358,8 +359,8 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int, opts ...pd.Get regions = append(regions, region) } - sort.Slice(regions, func(i, j int) bool { - return bytes.Compare(regions[i].Meta.GetStartKey(), regions[j].Meta.GetStartKey()) < 0 + slices.SortFunc(regions, func(i, j *Region) int { + return bytes.Compare(i.Meta.GetStartKey(), j.Meta.GetStartKey()) }) startPos := sort.Search(len(regions), func(i int) bool { diff --git a/internal/unionstore/memdb.go b/internal/unionstore/memdb.go index 1aca2fa2..ae4826c2 100644 --- a/internal/unionstore/memdb.go +++ b/internal/unionstore/memdb.go @@ -38,7 +38,6 @@ import ( "bytes" "fmt" "math" - "reflect" "sync" "unsafe" @@ -837,12 +836,8 @@ func (n *memdbNode) setBlack() { } func (n *memdbNode) getKey() []byte { - var ret []byte - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&ret)) - hdr.Data = uintptr(unsafe.Pointer(&n.flags)) + kv.FlagBytes - hdr.Len = int(n.klen) - hdr.Cap = int(n.klen) - return ret + base := unsafe.Add(unsafe.Pointer(&n.flags), kv.FlagBytes) + return unsafe.Slice((*byte)(base), int(n.klen)) } const ( diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 27e747e7..782ed53e 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -43,6 +43,7 @@ import ( "math" "math/rand" "runtime/trace" + "slices" "sort" "sync" "sync/atomic" @@ -1314,8 +1315,8 @@ func deduplicateKeys(keys [][]byte) [][]byte { return keys } - sort.Slice(keys, func(i, j int) bool { - return bytes.Compare(keys[i], keys[j]) < 0 + slices.SortFunc(keys, func(i, j []byte) int { + return bytes.Compare(i, j) }) deduped := keys[:1] for i := 1; i < len(keys); i++ { diff --git a/util/misc.go b/util/misc.go index e324bf79..0bfdb072 100644 --- a/util/misc.go +++ b/util/misc.go @@ -38,7 +38,6 @@ import ( "context" "encoding/hex" "fmt" - "reflect" "strconv" "strings" "time" @@ -168,11 +167,7 @@ func String(b []byte) (s string) { if len(b) == 0 { return "" } - pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - pstring := (*reflect.StringHeader)(unsafe.Pointer(&s)) - pstring.Data = pbytes.Data - pstring.Len = pbytes.Len - return + return unsafe.String(unsafe.SliceData(b), len(b)) } // ToUpperASCIIInplace bytes.ToUpper but zero-cost From 3480b5ed7ce131ed39f2e6ffcdec603f4166e225 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Tue, 23 Jan 2024 13:54:05 +0800 Subject: [PATCH 3/3] Revert "fix: check kill signal against 0 (#1102)" (#1129) This reverts commit 057c479dd8c27ab6b2b4175321166a5d5228d750. Signed-off-by: MyonKeminta Co-authored-by: MyonKeminta --- config/retry/backoff.go | 21 ++++----------------- integration_tests/2pc_test.go | 10 ---------- internal/client/retry/backoff.go | 27 ++++++--------------------- internal/locate/region_request.go | 5 +++-- kv/variables.go | 4 ---- 5 files changed, 13 insertions(+), 54 deletions(-) diff --git a/config/retry/backoff.go b/config/retry/backoff.go index 6122e300..a2723e05 100644 --- a/config/retry/backoff.go +++ b/config/retry/backoff.go @@ -217,9 +217,10 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e atomic.AddInt64(&detail.BackoffCount, 1) } - err2 := b.CheckKilled() - if err2 != nil { - return err2 + if b.vars != nil && b.vars.Killed != nil { + if atomic.LoadUint32(b.vars.Killed) == 1 { + return errors.WithStack(tikverr.ErrQueryInterrupted) + } } var startTs interface{} @@ -381,17 +382,3 @@ func (b *Backoffer) longestSleepCfg() (*Config, int) { } return nil, 0 } - -func (b *Backoffer) CheckKilled() error { - if b.vars != nil && b.vars.Killed != nil { - killed := atomic.LoadUint32(b.vars.Killed) - if killed != 0 { - logutil.BgLogger().Info( - "backoff stops because a killed signal is received", - zap.Uint32("signal", killed), - ) - return errors.WithStack(tikverr.ErrQueryInterrupted) - } - } - return nil -} diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 9369bfb6..355dafea 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -2502,13 +2502,3 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() { s.True(txn.GetMemBuffer().TryLock()) txn.GetMemBuffer().Unlock() } - -func (s *testCommitterSuite) TestKillSignal() { - txn := s.begin() - err := txn.Set([]byte("key"), []byte("value")) - s.Nil(err) - var killed uint32 = 2 - txn.SetVars(kv.NewVariables(&killed)) - err = txn.Commit(context.Background()) - s.ErrorContains(err, "query interrupted") -} diff --git a/internal/client/retry/backoff.go b/internal/client/retry/backoff.go index 7a35a74b..a2723e05 100644 --- a/internal/client/retry/backoff.go +++ b/internal/client/retry/backoff.go @@ -217,38 +217,23 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e atomic.AddInt64(&detail.BackoffCount, 1) } - err2 := b.checkKilled() - if err2 != nil { - return err2 + if b.vars != nil && b.vars.Killed != nil { + if atomic.LoadUint32(b.vars.Killed) == 1 { + return errors.WithStack(tikverr.ErrQueryInterrupted) + } } var startTs interface{} if ts := b.ctx.Value(TxnStartKey); ts != nil { startTs = ts } - logutil.Logger(b.ctx).Debug( - "retry later", + logutil.Logger(b.ctx).Debug("retry later", zap.Error(err), zap.Int("totalSleep", b.totalSleep), zap.Int("excludedSleep", b.excludedSleep), zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", cfg), - zap.Reflect("txnStartTS", startTs), - ) - return nil -} - -func (b *Backoffer) checkKilled() error { - if b.vars != nil && b.vars.Killed != nil { - killed := atomic.LoadUint32(b.vars.Killed) - if killed != 0 { - logutil.BgLogger().Info( - "backoff stops because a killed signal is received", - zap.Uint32("signal", killed), - ) - return errors.WithStack(tikverr.ErrQueryInterrupted) - } - } + zap.Reflect("txnStartTS", startTs)) return nil } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 4ddd1316..b5638eef 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1498,8 +1498,9 @@ func (s *RegionRequestSender) SendReqCtx( } // recheck whether the session/query is killed during the Next() - if err2 := bo.CheckKilled(); err2 != nil { - return nil, nil, retryTimes, err2 + boVars := bo.GetVars() + if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 { + return nil, nil, retryTimes, errors.WithStack(tikverr.ErrQueryInterrupted) } if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil { if val.(bool) { diff --git a/kv/variables.go b/kv/variables.go index cae78c9c..581be54d 100644 --- a/kv/variables.go +++ b/kv/variables.go @@ -44,10 +44,6 @@ type Variables struct { // Pointer to SessionVars.Killed // Killed is a flag to indicate that this query is killed. - // This is an enum value rather than a boolean. See sqlkiller.go - // in TiDB for its definition. - // When its value is 0, it's not killed - // When its value is not 0, it's killed, the value indicates concrete reason. Killed *uint32 }