Merge branch 'master' into update-dep

This commit is contained in:
lance6716 2024-01-23 15:23:15 +08:00 committed by GitHub
commit 3d7ff4c3c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 225 additions and 290 deletions

View File

@ -15,7 +15,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.21.0
go-version: 1.21.6
- name: Test
run: go test ./...
@ -28,7 +28,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.21.0
go-version: 1.21.6
- name: Test with race
run: go test -race ./...
@ -42,10 +42,10 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.21.0
go-version: 1.21.6
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
version: v1.55.2

View File

@ -4,7 +4,7 @@ linters:
disable-all: true
enable:
- bodyclose
- depguard
#- depguard
- exportloopref
- gofmt
- goimports

View File

@ -217,9 +217,10 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
atomic.AddInt64(&detail.BackoffCount, 1)
}
err2 := b.CheckKilled()
if err2 != nil {
return err2
if b.vars != nil && b.vars.Killed != nil {
if atomic.LoadUint32(b.vars.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
var startTs interface{}
@ -381,17 +382,3 @@ func (b *Backoffer) longestSleepCfg() (*Config, int) {
}
return nil, 0
}
func (b *Backoffer) CheckKilled() error {
if b.vars != nil && b.vars.Killed != nil {
killed := atomic.LoadUint32(b.vars.Killed)
if killed != 0 {
logutil.BgLogger().Info(
"backoff stops because a killed signal is received",
zap.Uint32("signal", killed),
)
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
return nil
}

View File

@ -2502,13 +2502,3 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() {
s.True(txn.GetMemBuffer().TryLock())
txn.GetMemBuffer().Unlock()
}
func (s *testCommitterSuite) TestKillSignal() {
txn := s.begin()
err := txn.Set([]byte("key"), []byte("value"))
s.Nil(err)
var killed uint32 = 2
txn.SetVars(kv.NewVariables(&killed))
err = txn.Commit(context.Background())
s.ErrorContains(err, "query interrupted")
}

View File

@ -217,38 +217,23 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
atomic.AddInt64(&detail.BackoffCount, 1)
}
err2 := b.checkKilled()
if err2 != nil {
return err2
if b.vars != nil && b.vars.Killed != nil {
if atomic.LoadUint32(b.vars.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
var startTs interface{}
if ts := b.ctx.Value(TxnStartKey); ts != nil {
startTs = ts
}
logutil.Logger(b.ctx).Debug(
"retry later",
logutil.Logger(b.ctx).Debug("retry later",
zap.Error(err),
zap.Int("totalSleep", b.totalSleep),
zap.Int("excludedSleep", b.excludedSleep),
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", cfg),
zap.Reflect("txnStartTS", startTs),
)
return nil
}
func (b *Backoffer) checkKilled() error {
if b.vars != nil && b.vars.Killed != nil {
killed := atomic.LoadUint32(b.vars.Killed)
if killed != 0 {
logutil.BgLogger().Info(
"backoff stops because a killed signal is received",
zap.Uint32("signal", killed),
)
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
zap.Reflect("txnStartTS", startTs))
return nil
}

View File

@ -287,11 +287,9 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
var leaderAccessIdx AccessIndex
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
store, exists := c.getStore(p.StoreId)
if !exists {
store = c.getStoreByStoreID(p.StoreId)
store = c.getStoreOrInsertDefault(p.StoreId)
}
addr, err := store.initResolve(bo, c)
if err != nil {
@ -334,11 +332,9 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
}
for _, p := range pdRegion.DownPeers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
store, exists := c.getStore(p.StoreId)
if !exists {
store = c.getStoreByStoreID(p.StoreId)
store = c.getStoreOrInsertDefault(p.StoreId)
}
addr, err := store.initResolve(bo, c)
if err != nil {
@ -468,7 +464,7 @@ func (mu *regionIndexMu) refresh(r []*Region) {
mu.sorted = newMu.sorted
}
type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState
type livenessFunc func(ctx context.Context, s *Store) livenessState
// RegionCache caches Regions loaded from PD.
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
@ -564,9 +560,7 @@ func newTestRegionCache() *RegionCache {
// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu = *newRegionIndexMu(nil)
c.storeMu.Lock()
c.storeMu.stores = make(map[uint64]*Store)
c.storeMu.Unlock()
c.clearStores()
}
// thread unsafe, should use with lock
@ -596,7 +590,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
select {
case <-c.ctx.Done():
return
case <-c.notifyCheckCh:
case <-c.getCheckStoreEvents():
c.checkAndResolve(needCheckStores, func(s *Store) bool {
return s.getResolveState() == needCheck
})
@ -639,15 +633,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}
}()
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
if needCheck(store) {
needCheckStores = append(needCheckStores, store)
}
}
c.storeMu.RUnlock()
for _, store := range needCheckStores {
for _, store := range c.filterStores(needCheckStores, needCheck) {
_, err := store.reResolve(c)
tikverr.Log(err)
}
@ -655,16 +641,14 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.storeMu.Lock()
defer c.storeMu.Unlock()
c.storeMu.stores[id] = &Store{
c.putStore(&Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
addr: addr,
peerAddr: peerAddr,
}
})
}
// SetPDClient replaces pd client,for testing only
@ -1224,7 +1208,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
c.markStoreNeedCheck(s)
return incEpochStoreIdx
}
@ -1625,41 +1609,17 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
}
// GetStoresByType gets stores by type `typ`
// TODO: revise it by get store by closure.
func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
c.storeMu.Lock()
defer c.storeMu.Unlock()
stores := make([]*Store, 0)
for _, store := range c.storeMu.stores {
if store.getResolveState() != resolved {
continue
}
if store.storeType == typ {
//TODO: revise it with store.clone()
storeLabel := make([]*metapb.StoreLabel, 0)
for _, label := range store.labels {
storeLabel = append(storeLabel, &metapb.StoreLabel{
Key: label.Key,
Value: label.Value,
})
}
stores = append(stores, &Store{
addr: store.addr,
peerAddr: store.peerAddr,
storeID: store.storeID,
labels: storeLabel,
storeType: typ,
})
}
}
return stores
return c.filterStores(nil, func(s *Store) bool {
return s.getResolveState() == resolved && s.storeType == typ
})
}
// GetAllStores gets TiKV and TiFlash stores.
func (c *RegionCache) GetAllStores() []*Store {
stores := c.GetStoresByType(tikvrpc.TiKV)
tiflashStores := c.GetStoresByType(tikvrpc.TiFlash)
return append(stores, tiflashStores...)
return c.filterStores(nil, func(s *Store) bool {
return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash)
})
}
func filterUnavailablePeers(region *pd.Region) {
@ -1927,7 +1887,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
addr, err = store.initResolve(bo, c)
return
case deleted:
addr = c.changeToActiveStore(region, store)
addr = c.changeToActiveStore(region, store.storeID)
return
case tombstone:
return "", nil
@ -1980,10 +1940,8 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, store *Store) (addr string) {
c.storeMu.RLock()
store = c.storeMu.stores[store.storeID]
c.storeMu.RUnlock()
func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) {
store, _ := c.getStore(storeID)
for {
oldRegionStore := region.getStore()
newRegionStore := oldRegionStore.clone()
@ -2003,32 +1961,6 @@ func (c *RegionCache) changeToActiveStore(region *Region, store *Store) (addr st
return
}
func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) {
var ok bool
c.storeMu.Lock()
store, ok = c.storeMu.stores[storeID]
if ok {
c.storeMu.Unlock()
return
}
store = &Store{storeID: storeID}
c.storeMu.stores[storeID] = store
c.storeMu.Unlock()
return
}
func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store {
c.storeMu.RLock()
defer c.storeMu.RUnlock()
s := make([]*Store, 0)
for _, store := range c.storeMu.stores {
if store.IsLabelsMatch(labels) {
s = append(s, store)
}
}
return s
}
// OnBucketVersionNotMatch removes the old buckets meta if the version is stale.
func (c *RegionCache) OnBucketVersionNotMatch(ctx *RPCContext, version uint64, keys [][]byte) {
r := c.GetCachedRegionWithRLock(ctx.Region)
@ -2118,35 +2050,27 @@ func (c *RegionCache) PDClient() pd.Client {
// GetTiFlashStores returns the information of all tiflash nodes.
func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store {
c.storeMu.RLock()
defer c.storeMu.RUnlock()
var stores []*Store
for _, s := range c.storeMu.stores {
if s.storeType == tikvrpc.TiFlash {
if !labelFilter(s.labels) {
continue
}
stores = append(stores, s)
}
}
return stores
return c.filterStores(nil, func(s *Store) bool {
return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels)
})
}
// GetTiFlashComputeStores returns all stores with lable <engine, tiflash_compute>.
func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) {
c.tiflashComputeStoreMu.RLock()
needReload := c.tiflashComputeStoreMu.needReload
stores := c.tiflashComputeStoreMu.stores
c.tiflashComputeStoreMu.RUnlock()
stores, needReload := c.listTiflashComputeStores()
if needReload {
return c.reloadTiFlashComputeStores(bo)
stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c)
if err == nil {
c.setTiflashComputeStores(stores)
}
return stores, err
}
return stores, nil
}
func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, _ error) {
stores, err := c.pdClient.GetAllStores(bo.GetCtx())
func reloadTiFlashComputeStores(ctx context.Context, registry storeRegistry) (res []*Store, _ error) {
stores, err := registry.fetchAllStores(ctx)
if err != nil {
return nil, err
}
@ -2163,10 +2087,6 @@ func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*St
})
}
}
c.tiflashComputeStoreMu.Lock()
c.tiflashComputeStoreMu.stores = res
c.tiflashComputeStoreMu.Unlock()
return res, nil
}
@ -2192,9 +2112,7 @@ func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool
// InvalidateTiFlashComputeStores set needReload be true,
// and will refresh tiflash_compute store cache next time.
func (c *RegionCache) InvalidateTiFlashComputeStores() {
c.tiflashComputeStoreMu.Lock()
defer c.tiflashComputeStoreMu.Unlock()
c.tiflashComputeStoreMu.needReload = true
c.markTiflashComputeStoresNeedReload()
}
// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
@ -2624,7 +2542,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, e
var store *metapb.Store
for {
start := time.Now()
store, err = c.pdClient.GetStore(bo.GetCtx(), s.storeID)
store, err = c.fetchStore(bo.GetCtx(), s.storeID)
metrics.LoadRegionCacheHistogramWithGetStore.Observe(time.Since(start).Seconds())
if err != nil {
metrics.RegionCacheCounterWithGetStoreError.Inc()
@ -2672,7 +2590,7 @@ func isStoreNotFoundError(err error) bool {
// deleted.
func (s *Store) reResolve(c *RegionCache) (bool, error) {
var addr string
store, err := c.pdClient.GetStore(context.Background(), s.storeID)
store, err := c.fetchStore(context.Background(), s.storeID)
if err != nil {
metrics.RegionCacheCounterWithGetStoreError.Inc()
} else {
@ -2700,12 +2618,10 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
c.storeMu.Lock()
if s.addr == addr {
newStore.slowScore = s.slowScore
}
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
c.putStore(newStore)
s.setResolveState(deleted)
return false, nil
}
@ -2749,16 +2665,6 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool {
}
}
// markNeedCheck marks resolved store to be async resolve to check store addr change.
func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) {
if s.changeResolveStateTo(resolved, needCheck) {
select {
case notifyCheckCh <- struct{}{}:
default:
}
}
}
// IsSameLabels returns whether the store have the same labels with target labels
func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool {
if len(s.labels) != len(labels) {
@ -2822,6 +2728,10 @@ func (s *Store) getLivenessState() livenessState {
type livenessState uint32
func (l livenessState) injectConstantLiveness(tk testingKnobs) {
tk.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { return l })
}
var (
livenessSf singleflight.Group
// storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance.
@ -2858,7 +2768,11 @@ func (s livenessState) String() string {
}
}
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) {
liveness = s.requestLiveness(bo.GetCtx(), c)
if liveness == reachable {
return
}
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store",
@ -2871,6 +2785,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt
s.unreachableSince = time.Now()
go s.checkUntilHealth(c)
}
return
}
func (s *Store) checkUntilHealth(c *RegionCache) {
@ -2897,8 +2812,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) {
}
}
bo := retry.NewNoopBackoff(c.ctx)
l := s.requestLiveness(bo, c)
l := s.requestLiveness(c.ctx, c)
if l == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))
@ -2909,7 +2823,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) {
}
}
func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) {
func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) {
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
switch val.(string) {
@ -2922,10 +2836,10 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness
}
}
if c != nil {
livenessFunc := c.testingKnobs.mockRequestLiveness.Load()
if tk != nil {
livenessFunc := tk.getMockRequestLiveness()
if livenessFunc != nil {
return (*livenessFunc)(s, bo)
return livenessFunc(ctx, s)
}
}
@ -2941,12 +2855,6 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness
rsCh := livenessSf.DoChan(addr, func() (interface{}, error) {
return invokeKVStatusAPI(addr, storeLivenessTimeout), nil
})
var ctx context.Context
if bo != nil {
ctx = bo.GetCtx()
} else {
ctx = context.Background()
}
select {
case rs := <-rsCh:
l = rs.Val.(livenessState)
@ -3083,12 +2991,10 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() {
}
}()
slowScoreMetrics := make(map[string]float64)
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
c.forEachStore(func(store *Store) {
store.updateSlowScoreStat()
slowScoreMetrics[store.addr] = float64(store.getSlowScore())
}
c.storeMu.RUnlock()
})
for store, score := range slowScoreMetrics {
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(store).Set(score)
}
@ -3118,14 +3024,12 @@ func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) {
case <-c.ctx.Done():
return
case <-ticker.C:
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
c.forEachStore(func(store *Store) {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
}
}
c.storeMu.RUnlock()
})
}
}
}
@ -3184,3 +3088,116 @@ func contains(startKey, endKey, key []byte) bool {
return bytes.Compare(startKey, key) <= 0 &&
(bytes.Compare(key, endKey) < 0 || len(endKey) == 0)
}
type testingKnobs interface {
getMockRequestLiveness() livenessFunc
setMockRequestLiveness(f livenessFunc)
}
func (c *RegionCache) getMockRequestLiveness() livenessFunc {
f := c.testingKnobs.mockRequestLiveness.Load()
if f == nil {
return nil
}
return *f
}
func (c *RegionCache) setMockRequestLiveness(f livenessFunc) {
c.testingKnobs.mockRequestLiveness.Store(&f)
}
type storeRegistry interface {
fetchStore(ctx context.Context, id uint64) (*metapb.Store, error)
fetchAllStores(ctx context.Context) ([]*metapb.Store, error)
}
func (c *RegionCache) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) {
return c.pdClient.GetStore(ctx, id)
}
func (c *RegionCache) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) {
return c.pdClient.GetAllStores(ctx)
}
func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) {
c.storeMu.RLock()
store, exists = c.storeMu.stores[id]
c.storeMu.RUnlock()
return
}
func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store {
c.storeMu.Lock()
store, exists := c.storeMu.stores[id]
if !exists {
store = &Store{storeID: id}
c.storeMu.stores[id] = store
}
c.storeMu.Unlock()
return store
}
func (c *RegionCache) putStore(store *Store) {
c.storeMu.Lock()
c.storeMu.stores[store.storeID] = store
c.storeMu.Unlock()
}
func (c *RegionCache) clearStores() {
c.storeMu.Lock()
c.storeMu.stores = make(map[uint64]*Store)
c.storeMu.Unlock()
}
func (c *RegionCache) forEachStore(f func(*Store)) {
c.storeMu.RLock()
defer c.storeMu.RUnlock()
for _, s := range c.storeMu.stores {
f(s)
}
}
func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []*Store {
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
if predicate == nil || predicate(store) {
dst = append(dst, store)
}
}
c.storeMu.RUnlock()
return dst
}
func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) {
c.tiflashComputeStoreMu.RLock()
needReload = c.tiflashComputeStoreMu.needReload
stores = c.tiflashComputeStoreMu.stores
c.tiflashComputeStoreMu.RUnlock()
return
}
func (c *RegionCache) setTiflashComputeStores(stores []*Store) {
c.tiflashComputeStoreMu.Lock()
c.tiflashComputeStoreMu.stores = stores
c.tiflashComputeStoreMu.needReload = false
c.tiflashComputeStoreMu.Unlock()
}
func (c *RegionCache) markTiflashComputeStoresNeedReload() {
c.tiflashComputeStoreMu.Lock()
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.Unlock()
}
func (c *RegionCache) markStoreNeedCheck(store *Store) {
if store.changeResolveStateTo(resolved, needCheck) {
select {
case c.notifyCheckCh <- struct{}{}:
default:
}
}
}
func (c *RegionCache) getCheckStoreEvents() <-chan struct{} {
return c.notifyCheckCh
}

View File

@ -170,7 +170,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() {
}
for _, testcase := range testcases {
s.T().Log(testcase.storeID)
store := s.cache.getStoreByStoreID(testcase.storeID)
store := s.cache.getStoreOrInsertDefault(testcase.storeID)
_, err := store.initResolve(s.bo, s.cache)
s.Nil(err)
labels := []*metapb.StoreLabel{
@ -179,7 +179,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() {
Value: fmt.Sprintf("%v", testcase.storeID),
},
}
stores := s.cache.getStoresByLabels(labels)
stores := s.cache.filterStores(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) })
s.Equal(len(stores), 1)
s.Equal(stores[0].labels, labels)
}
@ -209,7 +209,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
// Check resolving normal stores. The resolve state should be resolved.
for _, storeMeta := range s.cluster.GetAllStores() {
store := cache.getStoreByStoreID(storeMeta.GetId())
store := cache.getStoreOrInsertDefault(storeMeta.GetId())
s.Equal(store.getResolveState(), unresolved)
addr, err := store.initResolve(bo, cache)
s.Nil(err)
@ -227,26 +227,26 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
}
// Mark the store needCheck. The resolve state should be resolved soon.
store := cache.getStoreByStoreID(s.store1)
store.markNeedCheck(cache.notifyCheckCh)
store := cache.getStoreOrInsertDefault(s.store1)
cache.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), resolved)
// Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone.
s.cluster.MarkTombstone(s.store1)
store.markNeedCheck(cache.notifyCheckCh)
cache.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), tombstone)
s.cluster.StartStore(s.store1)
// Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone.
cache.clear()
store = cache.getStoreByStoreID(s.store1)
store = cache.getStoreOrInsertDefault(s.store1)
store.initResolve(bo, cache)
s.Equal(store.getResolveState(), resolved)
storeMeta := s.cluster.GetStore(s.store1)
s.cluster.RemoveStore(s.store1)
store.markNeedCheck(cache.notifyCheckCh)
cache.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), tombstone)
s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...)
@ -254,14 +254,14 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
// Mark the store needCheck and its address and labels are changed.
// The resolve state should be deleted and a new store is added to the cache.
cache.clear()
store = cache.getStoreByStoreID(s.store1)
store = cache.getStoreOrInsertDefault(s.store1)
store.initResolve(bo, cache)
s.Equal(store.getResolveState(), resolved)
s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"})
store.markNeedCheck(cache.notifyCheckCh)
cache.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), deleted)
newStore := cache.getStoreByStoreID(s.store1)
newStore := cache.getStoreOrInsertDefault(s.store1)
s.Equal(newStore.getResolveState(), resolved)
s.Equal(newStore.addr, store.addr+"0")
s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}})
@ -269,7 +269,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
// Check initResolve()ing a tombstone store. The resolve state should be tombstone.
cache.clear()
s.cluster.MarkTombstone(s.store1)
store = cache.getStoreByStoreID(s.store1)
store = cache.getStoreOrInsertDefault(s.store1)
for i := 0; i < 2; i++ {
addr, err := store.initResolve(bo, cache)
s.Nil(err)
@ -283,7 +283,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
cache.clear()
storeMeta = s.cluster.GetStore(s.store1)
s.cluster.RemoveStore(s.store1)
store = cache.getStoreByStoreID(s.store1)
store = cache.getStoreOrInsertDefault(s.store1)
for i := 0; i < 2; i++ {
addr, err := store.initResolve(bo, cache)
s.Nil(err)
@ -1542,7 +1542,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
newMeta := proto.Clone(cachedRegion.meta).(*metapb.Region)
newMeta.RegionEpoch.Version++
newMeta.RegionEpoch.ConfVer++
_, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreByStoreID(s.store1)}, []*metapb.Region{newMeta})
_, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreOrInsertDefault(s.store1)}, []*metapb.Region{newMeta})
s.Nil(err)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

View File

@ -1145,12 +1145,7 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo
}
func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
if liveness != reachable {
store.startHealthCheckLoopIfNeeded(s.regionCache, liveness)
}
return liveness
return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache)
}
func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) {
@ -1164,7 +1159,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error)
)
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
// schedule a store addr resolve.
store.markNeedCheck(s.regionCache.notifyCheckCh)
s.regionCache.markStoreNeedCheck(store)
store.markAlreadySlow()
}
}
@ -1503,8 +1498,9 @@ func (s *RegionRequestSender) SendReqCtx(
}
// recheck whether the session/query is killed during the Next()
if err2 := bo.CheckKilled(); err2 != nil {
return nil, nil, retryTimes, err2
boVars := bo.GetVars()
if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 {
return nil, nil, retryTimes, errors.WithStack(tikverr.ErrQueryInterrupted)
}
if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil {
if val.(bool) {
@ -2197,7 +2193,7 @@ func (s *RegionRequestSender) onRegionError(
zap.Stringer("storeNotMatch", storeNotMatch),
zap.Stringer("ctx", ctx),
)
ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh)
s.regionCache.markStoreNeedCheck(ctx.Store)
s.regionCache.InvalidateCachedRegion(ctx.Region)
// It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment,
// so we always reconnect in this case.

View File

@ -102,7 +102,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() {
s.NotNil(region)
oldStoreLimit := kv.StoreLimit.Load()
kv.StoreLimit.Store(500)
s.cache.getStoreByStoreID(s.storeIDs[0]).tokenCount.Store(500)
s.cache.getStoreOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500)
// cause there is only one region in this cluster, regionID maps this leader.
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.NotNil(err)
@ -247,13 +247,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
return innerClient.SendRequest(ctx, addr, req, timeout)
}}
var storeState = uint32(unreachable)
tf := func(s *Store, bo *retry.Backoffer) livenessState {
s.regionRequestSender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
if s.addr == leaderAddr {
return livenessState(atomic.LoadUint32(&storeState))
}
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
})
loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k"))
s.Nil(err)
@ -521,10 +520,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
unreachable.injectConstantLiveness(cache)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
_, err = replicaSelector.next(s.bo)
s.Nil(err)
@ -560,11 +556,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Do not try to use proxy if livenessState is unknown instead of unreachable.
refreshEpochs(regionStore)
cache.enableForwarding = true
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unknown
}
cache.testingKnobs.mockRequestLiveness.Store(
(*livenessFunc)(&tf))
unknown.injectConstantLiveness(cache)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
@ -586,10 +578,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
unreachable.injectConstantLiveness(cache)
s.Eventually(func() bool {
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable
}, 3*time.Second, 200*time.Millisecond)
@ -849,11 +838,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
// The leader store is alive but can't provide service.
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
@ -979,10 +964,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}
// Runs out of all replicas and then returns a send error.
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
unreachable.injectConstantLiveness(s.cache)
reloadRegion()
for _, store := range s.storeIDs {
s.cluster.StopStore(store)
@ -999,10 +981,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
// Verify switch to the leader immediately when stale read requests with global txn scope meet region errors.
s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0])
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
@ -1367,10 +1346,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
}
// Test for write request.
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
reachable.injectConstantLiveness(s.cache)
resetStats()
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{})
req.ReplicaReadType = kv.ReplicaReadLeader

View File

@ -84,7 +84,7 @@ func (s *testRegionCacheStaleReadSuite) SetupTest() {
}
func (s *testRegionCacheStaleReadSuite) TearDownTest() {
s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(nil))
s.cache.setMockRequestLiveness(nil)
s.cache.Close()
s.mvccStore.Close()
}
@ -222,14 +222,13 @@ func (s *testRegionCacheStaleReadSuite) setClient() {
return
}}
tf := func(store *Store, bo *retry.Backoffer) livenessState {
s.cache.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState {
_, ok := s.injection.unavailableStoreIDs[store.storeID]
if ok {
return unreachable
}
return reachable
}
s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
})
}
func (s *testRegionCacheStaleReadSuite) extractResp(resp *tikvrpc.Response) (uint64, string, SuccessReadType) {

View File

@ -748,9 +748,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
}}
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
defer func() {
rpcClient.Close()
@ -775,7 +772,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
}()
req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1})
regionRequestSender := NewRegionRequestSender(s.cache, fnClient)
regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
reachable.injectConstantLiveness(regionRequestSender.regionCache)
regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
}
}()

View File

@ -38,6 +38,7 @@ import (
"bytes"
"context"
"math"
"slices"
"sort"
"sync"
"time"
@ -358,8 +359,8 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int, opts ...pd.Get
regions = append(regions, region)
}
sort.Slice(regions, func(i, j int) bool {
return bytes.Compare(regions[i].Meta.GetStartKey(), regions[j].Meta.GetStartKey()) < 0
slices.SortFunc(regions, func(i, j *Region) int {
return bytes.Compare(i.Meta.GetStartKey(), j.Meta.GetStartKey())
})
startPos := sort.Search(len(regions), func(i int) bool {

View File

@ -38,7 +38,6 @@ import (
"bytes"
"fmt"
"math"
"reflect"
"sync"
"unsafe"
@ -837,12 +836,8 @@ func (n *memdbNode) setBlack() {
}
func (n *memdbNode) getKey() []byte {
var ret []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
hdr.Data = uintptr(unsafe.Pointer(&n.flags)) + kv.FlagBytes
hdr.Len = int(n.klen)
hdr.Cap = int(n.klen)
return ret
base := unsafe.Add(unsafe.Pointer(&n.flags), kv.FlagBytes)
return unsafe.Slice((*byte)(base), int(n.klen))
}
const (

View File

@ -44,10 +44,6 @@ type Variables struct {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
// This is an enum value rather than a boolean. See sqlkiller.go
// in TiDB for its definition.
// When its value is 0, it's not killed
// When its value is not 0, it's killed, the value indicates concrete reason.
Killed *uint32
}

View File

@ -43,6 +43,7 @@ import (
"math"
"math/rand"
"runtime/trace"
"slices"
"sort"
"sync"
"sync/atomic"
@ -1314,8 +1315,8 @@ func deduplicateKeys(keys [][]byte) [][]byte {
return keys
}
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
slices.SortFunc(keys, func(i, j []byte) int {
return bytes.Compare(i, j)
})
deduped := keys[:1]
for i := 1; i < len(keys); i++ {

View File

@ -38,7 +38,6 @@ import (
"context"
"encoding/hex"
"fmt"
"reflect"
"strconv"
"strings"
"time"
@ -168,11 +167,7 @@ func String(b []byte) (s string) {
if len(b) == 0 {
return ""
}
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pstring.Data = pbytes.Data
pstring.Len = pbytes.Len
return
return unsafe.String(unsafe.SliceData(b), len(b))
}
// ToUpperASCIIInplace bytes.ToUpper but zero-cost