diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index de2b1523..f3e91e0d 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -82,9 +82,11 @@ type testCommitterSuite struct { func (s *testCommitterSuite) SetupSuite() { atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`)) } func (s *testCommitterSuite) TearDownSuite() { + s.Nil(failpoint.Disable("tikvclient/injectLiveness")) atomic.StoreUint64(&transaction.CommitMaxBackoff, 20000) } @@ -98,7 +100,6 @@ func (s *testCommitterSuite) SetupTest() { store, err := tikv.NewKVStore("mocktikv-store", pdCli, spkv, client) store.EnableTxnLocalLatches(8096) s.Require().Nil(err) - s.store = tikv.StoreProbe{KVStore: store} } diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index fa8c38fe..9f3b38e4 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -381,7 +381,10 @@ type RegionCache struct { stores []*Store } notifyCheckCh chan struct{} - closeCh chan struct{} + + // Context for background jobs + ctx context.Context + cancelFunc context.CancelFunc testingKnobs struct { // Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, @@ -402,7 +405,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.tiflashMPPStoreMu.needReload = true c.tiflashMPPStoreMu.stores = make([]*Store, 0) c.notifyCheckCh = make(chan struct{}, 1) - c.closeCh = make(chan struct{}) + c.ctx, c.cancelFunc = context.WithCancel(context.Background()) interval := config.GetGlobalConfig().StoresRefreshInterval go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) c.enableForwarding = config.GetGlobalConfig().EnableForwarding @@ -423,7 +426,7 @@ func (c *RegionCache) clear() { // Close releases region cache's resource. func (c *RegionCache) Close() { - close(c.closeCh) + c.cancelFunc() } // asyncCheckAndResolveLoop with @@ -434,7 +437,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { for { needCheckStores = needCheckStores[:0] select { - case <-c.closeCh: + case <-c.ctx.Done(): return case <-c.notifyCheckCh: c.checkAndResolve(needCheckStores, func(s *Store) bool { @@ -613,7 +616,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep proxyAddr string ) if c.enableForwarding && isLeaderReq { - if atomic.LoadInt32(&store.unreachable) == 0 { + if store.getLivenessState() == reachable { regionStore.unsetProxyStoreIfNeeded(cachedRegion) } else { proxyStore, _, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx) @@ -1627,7 +1630,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S } func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) { - if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.unreachable) == 0 { + if !c.enableForwarding || store.storeType != tikvrpc.TiKV || store.getLivenessState() == reachable { return } @@ -1657,7 +1660,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor } storeIdx, store := rs.accessStore(tiKVOnly, AccessIndex(index)) // Skip unreachable stores. - if atomic.LoadInt32(&store.unreachable) != 0 { + if store.getLivenessState() == unreachable { continue } @@ -2159,7 +2162,7 @@ type Store struct { // whether the store is unreachable due to some reason, therefore requests to the store needs to be // forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store. // this mechanism is currently only applicable for TiKV stores. - unreachable int32 + livenessState uint32 unreachableSince time.Time } @@ -2362,12 +2365,19 @@ func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (r return res } +// getLivenessState gets the cached liveness state of the store. +// When it's not reachable, a goroutine will update the state in background. +// To get the accurate liveness state, use checkLiveness instead. +func (s *Store) getLivenessState() livenessState { + return livenessState(atomic.LoadUint32(&s.livenessState)) +} + type livenessState uint32 var ( livenessSf singleflight.Group // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. - storeLivenessTimeout time.Duration + storeLivenessTimeout = time.Second ) // SetStoreLivenessTimeout sets storeLivenessTimeout to t. @@ -2381,12 +2391,12 @@ func GetStoreLivenessTimeout() time.Duration { } const ( - unknown livenessState = iota - reachable + reachable livenessState = iota unreachable + unknown ) -func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) { +func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store", @@ -2395,24 +2405,21 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) { } // It may be already started by another thread. - if atomic.CompareAndSwapInt32(&s.unreachable, 0, 1) { + if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { s.unreachableSince = time.Now() go s.checkUntilHealth(c) } } func (s *Store) checkUntilHealth(c *RegionCache) { - defer atomic.CompareAndSwapInt32(&s.unreachable, 1, 0) + defer atomic.StoreUint32(&s.livenessState, uint32(reachable)) ticker := time.NewTicker(time.Second) lastCheckPDTime := time.Now() - // TODO(MyonKeminta): Set a more proper ctx here so that it can be interrupted immediately when the RegionCache is - // shutdown. - ctx := context.Background() for { select { - case <-c.closeCh: + case <-c.ctx.Done(): return case <-ticker.C: if time.Since(lastCheckPDTime) > time.Second*30 { @@ -2427,18 +2434,30 @@ func (s *Store) checkUntilHealth(c *RegionCache) { } } - bo := retry.NewNoopBackoff(ctx) + bo := retry.NewNoopBackoff(c.ctx) l := s.requestLiveness(bo, c) if l == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) return } + atomic.StoreUint32(&s.livenessState, uint32(l)) } } } func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) { + // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. + if val, err := util.EvalFailpoint("injectLiveness"); err == nil { + switch val.(string) { + case "unreachable": + return unreachable + case "reachable": + return reachable + case "unknown": + return unknown + } + } if c != nil && c.testingKnobs.mockRequestLiveness != nil { return c.testingKnobs.mockRequestLiveness(s, bo) } @@ -2511,7 +2530,7 @@ func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) { } status := resp.GetStatus() - if status == healthpb.HealthCheckResponse_UNKNOWN { + if status == healthpb.HealthCheckResponse_UNKNOWN || status == healthpb.HealthCheckResponse_SERVICE_UNKNOWN { logutil.BgLogger().Info("[health check] check health returns unknown", zap.String("store", addr)) l = unknown return diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c9c4c8d4..bb8fb32c 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -322,7 +322,16 @@ type accessKnownLeader struct { func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { leader := selector.replicas[state.leaderIdx] - if leader.isExhausted(maxReplicaAttempt) { + liveness := leader.store.getLivenessState() + if liveness == unreachable && selector.regionCache.enableForwarding { + selector.state = &tryNewProxy{leaderIdx: state.leaderIdx} + return nil, stateChanged{} + } + // If hibernate region is enabled and the leader is not reachable, the raft group + // will not be wakened up and re-elect the leader until the follower receives + // a request. So, before the new leader is elected, we should not send requests + // to the unreachable old leader to avoid unnecessary timeout. + if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} return nil, stateChanged{} } @@ -332,7 +341,8 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { liveness := selector.checkLiveness(bo, selector.targetReplica()) - if liveness != reachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding { + // Only enable forwarding when unreachable to avoid using proxy to access a TiKV that cannot serve. + if liveness == unreachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding { selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx} return } @@ -407,7 +417,7 @@ type accessByKnownProxy struct { func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { leader := selector.replicas[state.leaderIdx] - if atomic.LoadInt32(&leader.store.unreachable) == 0 { + if leader.store.getLivenessState() == reachable { selector.regionStore.unsetProxyStoreIfNeeded(selector.region) selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx} return nil, stateChanged{} @@ -442,7 +452,7 @@ type tryNewProxy struct { func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { leader := selector.replicas[state.leaderIdx] - if atomic.LoadInt32(&leader.store.unreachable) == 0 { + if leader.store.getLivenessState() == reachable { selector.regionStore.unsetProxyStoreIfNeeded(selector.region) selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx} return nil, stateChanged{} @@ -770,11 +780,8 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) - // We only check health in loop if forwarding is enabled now. - // The restriction might be relaxed if necessary, but the implementation - // may be checked carefully again. - if liveness != reachable && s.regionCache.enableForwarding { - store.startHealthCheckLoopIfNeeded(s.regionCache) + if liveness != reachable { + store.startHealthCheckLoopIfNeeded(s.regionCache, liveness) } return liveness } @@ -815,6 +822,13 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { } for i, replica := range s.replicas { if isSamePeer(replica.peer, leader) { + // If hibernate region is enabled and the leader is not reachable, the raft group + // will not be wakened up and re-elect the leader until the follower receives + // a request. So, before the new leader is elected, we should not send requests + // to the unreachable old leader to avoid unnecessary timeout. + if replica.store.getLivenessState() != reachable { + return + } if replica.isExhausted(maxReplicaAttempt) { // Give the replica one more chance and because each follower is tried only once, // it won't result in infinite retry. diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c3de16a1..a9f495bf 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -163,9 +163,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { } return innerClient.SendRequest(ctx, addr, req, timeout) }} - var storeState uint32 = uint32(unreachable) + var storeState = uint32(unreachable) s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { - return livenessState(atomic.LoadUint32(&storeState)) + if s.addr == leaderAddr { + return livenessState(atomic.LoadUint32(&storeState)) + } + return reachable } loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k")) @@ -191,7 +194,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { atomic.StoreUint32(&storeState, uint32(reachable)) start := time.Now() for { - if atomic.LoadInt32(&leaderStore.unreachable) == 0 { + if leaderStore.getLivenessState() == reachable { break } if time.Since(start) > 3*time.Second { @@ -386,6 +389,47 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx) assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), nil) s.Equal(replicaSelector.targetReplica().attempts, 1) + // If the NotLeader errors provides an unreachable leader, do not switch to it. + replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{ + RegionId: region.GetID(), Leader: &metapb.Peer{Id: s.peerIDs[regionStore.workTiKVIdx], StoreId: s.storeIDs[regionStore.workTiKVIdx]}, + }) + s.IsType(&tryFollower{}, replicaSelector.state) + + // If the leader is unreachable and forwarding is not enabled, just do not try + // the unreachable leader. + refreshEpochs(regionStore) + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) + s.Nil(err) + s.NotNil(replicaSelector) + s.IsType(&accessKnownLeader{}, replicaSelector.state) + // Now, livenessState is unreachable, so it will try a reachable follower instead of the unreachable leader. + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + s.NotNil(rpcCtx) + _, ok := replicaSelector.state.(*tryFollower) + s.True(ok) + s.NotEqual(regionStore.workTiKVIdx, replicaSelector.targetIdx) + + // Do not try to use proxy if livenessState is unknown instead of unreachable. + refreshEpochs(regionStore) + cache.enableForwarding = true + cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { + return unknown + } + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) + s.Nil(err) + s.NotNil(replicaSelector) + s.Eventually(func() bool { + return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unknown + }, 3*time.Second, 200*time.Millisecond) + s.IsType(&accessKnownLeader{}, replicaSelector.state) + // Now, livenessState is unknown. Even if forwarding is enabled, it should try followers + // instead of using the proxy. + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + s.NotNil(rpcCtx) + _, ok = replicaSelector.state.(*tryFollower) + s.True(ok) // Test switching to tryNewProxy if leader is unreachable and forwarding is enabled refreshEpochs(regionStore) @@ -396,20 +440,21 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { return unreachable } + s.Eventually(func() bool { + return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable + }, 3*time.Second, 200*time.Millisecond) s.IsType(&accessKnownLeader{}, replicaSelector.state) - _, err = replicaSelector.next(s.bo) - s.Nil(err) - replicaSelector.onSendFailure(s.bo, nil) + // Now, livenessState is unreachable, so it will try a new proxy instead of the leader. rpcCtx, err = replicaSelector.next(s.bo) - s.NotNil(rpcCtx) s.Nil(err) + s.NotNil(rpcCtx) state, ok := replicaSelector.state.(*tryNewProxy) s.True(ok) s.Equal(regionStore.workTiKVIdx, state.leaderIdx) s.Equal(AccessIndex(2), replicaSelector.targetIdx) s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx) assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) - s.Equal(replicaSelector.targetReplica().attempts, 2) + s.Equal(replicaSelector.targetReplica().attempts, 1) s.Equal(replicaSelector.proxyReplica().attempts, 1) // When the current proxy node fails, it should try another one. @@ -423,7 +468,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Equal(regionStore.workTiKVIdx, state.leaderIdx) s.Equal(AccessIndex(2), replicaSelector.targetIdx) s.NotEqual(lastProxy, replicaSelector.proxyIdx) - s.Equal(replicaSelector.targetReplica().attempts, 3) + s.Equal(replicaSelector.targetReplica().attempts, 2) s.Equal(replicaSelector.proxyReplica().attempts, 1) // Test proxy store is saves when proxy is enabled @@ -643,15 +688,21 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.Equal(bo.GetTotalBackoffTimes(), 3) + s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped s.False(sender.replicaSelector.region.isValid()) s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) // The leader store is alive but can't provide service. - // Region will be invalidated due to running out of all replicas. s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { return reachable } + s.Eventually(func() bool { + stores := s.regionRequestSender.replicaSelector.regionStore.stores + return stores[0].getLivenessState() == reachable && + stores[1].getLivenessState() == reachable && + stores[2].getLivenessState() == reachable + }, 3*time.Second, 200*time.Millisecond) + // Region will be invalidated due to running out of all replicas. reloadRegion() s.cluster.StopStore(s.storeIDs[0]) bo = retry.NewBackoffer(context.Background(), -1)