mirror of https://github.com/tikv/client-go.git
Avoid sending requests to unhealthy leader (#503)
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
2807409d49
commit
be31f33ba0
|
|
@ -82,9 +82,11 @@ type testCommitterSuite struct {
|
|||
func (s *testCommitterSuite) SetupSuite() {
|
||||
atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s
|
||||
atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000)
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`))
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TearDownSuite() {
|
||||
s.Nil(failpoint.Disable("tikvclient/injectLiveness"))
|
||||
atomic.StoreUint64(&transaction.CommitMaxBackoff, 20000)
|
||||
}
|
||||
|
||||
|
|
@ -98,7 +100,6 @@ func (s *testCommitterSuite) SetupTest() {
|
|||
store, err := tikv.NewKVStore("mocktikv-store", pdCli, spkv, client)
|
||||
store.EnableTxnLocalLatches(8096)
|
||||
s.Require().Nil(err)
|
||||
|
||||
s.store = tikv.StoreProbe{KVStore: store}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -381,7 +381,10 @@ type RegionCache struct {
|
|||
stores []*Store
|
||||
}
|
||||
notifyCheckCh chan struct{}
|
||||
closeCh chan struct{}
|
||||
|
||||
// Context for background jobs
|
||||
ctx context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
|
||||
testingKnobs struct {
|
||||
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
|
||||
|
|
@ -402,7 +405,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
|
|||
c.tiflashMPPStoreMu.needReload = true
|
||||
c.tiflashMPPStoreMu.stores = make([]*Store, 0)
|
||||
c.notifyCheckCh = make(chan struct{}, 1)
|
||||
c.closeCh = make(chan struct{})
|
||||
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
|
||||
interval := config.GetGlobalConfig().StoresRefreshInterval
|
||||
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
|
||||
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
|
||||
|
|
@ -423,7 +426,7 @@ func (c *RegionCache) clear() {
|
|||
|
||||
// Close releases region cache's resource.
|
||||
func (c *RegionCache) Close() {
|
||||
close(c.closeCh)
|
||||
c.cancelFunc()
|
||||
}
|
||||
|
||||
// asyncCheckAndResolveLoop with
|
||||
|
|
@ -434,7 +437,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
|
|||
for {
|
||||
needCheckStores = needCheckStores[:0]
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-c.notifyCheckCh:
|
||||
c.checkAndResolve(needCheckStores, func(s *Store) bool {
|
||||
|
|
@ -613,7 +616,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
|
|||
proxyAddr string
|
||||
)
|
||||
if c.enableForwarding && isLeaderReq {
|
||||
if atomic.LoadInt32(&store.unreachable) == 0 {
|
||||
if store.getLivenessState() == reachable {
|
||||
regionStore.unsetProxyStoreIfNeeded(cachedRegion)
|
||||
} else {
|
||||
proxyStore, _, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
|
||||
|
|
@ -1627,7 +1630,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
|
|||
}
|
||||
|
||||
func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
|
||||
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.unreachable) == 0 {
|
||||
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || store.getLivenessState() == reachable {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1657,7 +1660,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
|
|||
}
|
||||
storeIdx, store := rs.accessStore(tiKVOnly, AccessIndex(index))
|
||||
// Skip unreachable stores.
|
||||
if atomic.LoadInt32(&store.unreachable) != 0 {
|
||||
if store.getLivenessState() == unreachable {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -2159,7 +2162,7 @@ type Store struct {
|
|||
// whether the store is unreachable due to some reason, therefore requests to the store needs to be
|
||||
// forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store.
|
||||
// this mechanism is currently only applicable for TiKV stores.
|
||||
unreachable int32
|
||||
livenessState uint32
|
||||
unreachableSince time.Time
|
||||
}
|
||||
|
||||
|
|
@ -2362,12 +2365,19 @@ func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (r
|
|||
return res
|
||||
}
|
||||
|
||||
// getLivenessState gets the cached liveness state of the store.
|
||||
// When it's not reachable, a goroutine will update the state in background.
|
||||
// To get the accurate liveness state, use checkLiveness instead.
|
||||
func (s *Store) getLivenessState() livenessState {
|
||||
return livenessState(atomic.LoadUint32(&s.livenessState))
|
||||
}
|
||||
|
||||
type livenessState uint32
|
||||
|
||||
var (
|
||||
livenessSf singleflight.Group
|
||||
// storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance.
|
||||
storeLivenessTimeout time.Duration
|
||||
storeLivenessTimeout = time.Second
|
||||
)
|
||||
|
||||
// SetStoreLivenessTimeout sets storeLivenessTimeout to t.
|
||||
|
|
@ -2381,12 +2391,12 @@ func GetStoreLivenessTimeout() time.Duration {
|
|||
}
|
||||
|
||||
const (
|
||||
unknown livenessState = iota
|
||||
reachable
|
||||
reachable livenessState = iota
|
||||
unreachable
|
||||
unknown
|
||||
)
|
||||
|
||||
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) {
|
||||
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
|
||||
// This mechanism doesn't support non-TiKV stores currently.
|
||||
if s.storeType != tikvrpc.TiKV {
|
||||
logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store",
|
||||
|
|
@ -2395,24 +2405,21 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) {
|
|||
}
|
||||
|
||||
// It may be already started by another thread.
|
||||
if atomic.CompareAndSwapInt32(&s.unreachable, 0, 1) {
|
||||
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
|
||||
s.unreachableSince = time.Now()
|
||||
go s.checkUntilHealth(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) checkUntilHealth(c *RegionCache) {
|
||||
defer atomic.CompareAndSwapInt32(&s.unreachable, 1, 0)
|
||||
defer atomic.StoreUint32(&s.livenessState, uint32(reachable))
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
lastCheckPDTime := time.Now()
|
||||
|
||||
// TODO(MyonKeminta): Set a more proper ctx here so that it can be interrupted immediately when the RegionCache is
|
||||
// shutdown.
|
||||
ctx := context.Background()
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if time.Since(lastCheckPDTime) > time.Second*30 {
|
||||
|
|
@ -2427,18 +2434,30 @@ func (s *Store) checkUntilHealth(c *RegionCache) {
|
|||
}
|
||||
}
|
||||
|
||||
bo := retry.NewNoopBackoff(ctx)
|
||||
bo := retry.NewNoopBackoff(c.ctx)
|
||||
l := s.requestLiveness(bo, c)
|
||||
if l == reachable {
|
||||
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))
|
||||
|
||||
return
|
||||
}
|
||||
atomic.StoreUint32(&s.livenessState, uint32(l))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) {
|
||||
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
|
||||
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
|
||||
switch val.(string) {
|
||||
case "unreachable":
|
||||
return unreachable
|
||||
case "reachable":
|
||||
return reachable
|
||||
case "unknown":
|
||||
return unknown
|
||||
}
|
||||
}
|
||||
if c != nil && c.testingKnobs.mockRequestLiveness != nil {
|
||||
return c.testingKnobs.mockRequestLiveness(s, bo)
|
||||
}
|
||||
|
|
@ -2511,7 +2530,7 @@ func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
|
|||
}
|
||||
|
||||
status := resp.GetStatus()
|
||||
if status == healthpb.HealthCheckResponse_UNKNOWN {
|
||||
if status == healthpb.HealthCheckResponse_UNKNOWN || status == healthpb.HealthCheckResponse_SERVICE_UNKNOWN {
|
||||
logutil.BgLogger().Info("[health check] check health returns unknown", zap.String("store", addr))
|
||||
l = unknown
|
||||
return
|
||||
|
|
|
|||
|
|
@ -322,7 +322,16 @@ type accessKnownLeader struct {
|
|||
|
||||
func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
|
||||
leader := selector.replicas[state.leaderIdx]
|
||||
if leader.isExhausted(maxReplicaAttempt) {
|
||||
liveness := leader.store.getLivenessState()
|
||||
if liveness == unreachable && selector.regionCache.enableForwarding {
|
||||
selector.state = &tryNewProxy{leaderIdx: state.leaderIdx}
|
||||
return nil, stateChanged{}
|
||||
}
|
||||
// If hibernate region is enabled and the leader is not reachable, the raft group
|
||||
// will not be wakened up and re-elect the leader until the follower receives
|
||||
// a request. So, before the new leader is elected, we should not send requests
|
||||
// to the unreachable old leader to avoid unnecessary timeout.
|
||||
if liveness != reachable || leader.isExhausted(maxReplicaAttempt) {
|
||||
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
|
||||
return nil, stateChanged{}
|
||||
}
|
||||
|
|
@ -332,7 +341,8 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
|
|||
|
||||
func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
|
||||
liveness := selector.checkLiveness(bo, selector.targetReplica())
|
||||
if liveness != reachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding {
|
||||
// Only enable forwarding when unreachable to avoid using proxy to access a TiKV that cannot serve.
|
||||
if liveness == unreachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding {
|
||||
selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx}
|
||||
return
|
||||
}
|
||||
|
|
@ -407,7 +417,7 @@ type accessByKnownProxy struct {
|
|||
|
||||
func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
|
||||
leader := selector.replicas[state.leaderIdx]
|
||||
if atomic.LoadInt32(&leader.store.unreachable) == 0 {
|
||||
if leader.store.getLivenessState() == reachable {
|
||||
selector.regionStore.unsetProxyStoreIfNeeded(selector.region)
|
||||
selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx}
|
||||
return nil, stateChanged{}
|
||||
|
|
@ -442,7 +452,7 @@ type tryNewProxy struct {
|
|||
|
||||
func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
|
||||
leader := selector.replicas[state.leaderIdx]
|
||||
if atomic.LoadInt32(&leader.store.unreachable) == 0 {
|
||||
if leader.store.getLivenessState() == reachable {
|
||||
selector.regionStore.unsetProxyStoreIfNeeded(selector.region)
|
||||
selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx}
|
||||
return nil, stateChanged{}
|
||||
|
|
@ -770,11 +780,8 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
|
|||
func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
|
||||
store := accessReplica.store
|
||||
liveness := store.requestLiveness(bo, s.regionCache)
|
||||
// We only check health in loop if forwarding is enabled now.
|
||||
// The restriction might be relaxed if necessary, but the implementation
|
||||
// may be checked carefully again.
|
||||
if liveness != reachable && s.regionCache.enableForwarding {
|
||||
store.startHealthCheckLoopIfNeeded(s.regionCache)
|
||||
if liveness != reachable {
|
||||
store.startHealthCheckLoopIfNeeded(s.regionCache, liveness)
|
||||
}
|
||||
return liveness
|
||||
}
|
||||
|
|
@ -815,6 +822,13 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
|
|||
}
|
||||
for i, replica := range s.replicas {
|
||||
if isSamePeer(replica.peer, leader) {
|
||||
// If hibernate region is enabled and the leader is not reachable, the raft group
|
||||
// will not be wakened up and re-elect the leader until the follower receives
|
||||
// a request. So, before the new leader is elected, we should not send requests
|
||||
// to the unreachable old leader to avoid unnecessary timeout.
|
||||
if replica.store.getLivenessState() != reachable {
|
||||
return
|
||||
}
|
||||
if replica.isExhausted(maxReplicaAttempt) {
|
||||
// Give the replica one more chance and because each follower is tried only once,
|
||||
// it won't result in infinite retry.
|
||||
|
|
|
|||
|
|
@ -163,9 +163,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
|
|||
}
|
||||
return innerClient.SendRequest(ctx, addr, req, timeout)
|
||||
}}
|
||||
var storeState uint32 = uint32(unreachable)
|
||||
var storeState = uint32(unreachable)
|
||||
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
|
||||
return livenessState(atomic.LoadUint32(&storeState))
|
||||
if s.addr == leaderAddr {
|
||||
return livenessState(atomic.LoadUint32(&storeState))
|
||||
}
|
||||
return reachable
|
||||
}
|
||||
|
||||
loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k"))
|
||||
|
|
@ -191,7 +194,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
|
|||
atomic.StoreUint32(&storeState, uint32(reachable))
|
||||
start := time.Now()
|
||||
for {
|
||||
if atomic.LoadInt32(&leaderStore.unreachable) == 0 {
|
||||
if leaderStore.getLivenessState() == reachable {
|
||||
break
|
||||
}
|
||||
if time.Since(start) > 3*time.Second {
|
||||
|
|
@ -386,6 +389,47 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx)
|
||||
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), nil)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 1)
|
||||
// If the NotLeader errors provides an unreachable leader, do not switch to it.
|
||||
replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{
|
||||
RegionId: region.GetID(), Leader: &metapb.Peer{Id: s.peerIDs[regionStore.workTiKVIdx], StoreId: s.storeIDs[regionStore.workTiKVIdx]},
|
||||
})
|
||||
s.IsType(&tryFollower{}, replicaSelector.state)
|
||||
|
||||
// If the leader is unreachable and forwarding is not enabled, just do not try
|
||||
// the unreachable leader.
|
||||
refreshEpochs(regionStore)
|
||||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
// Now, livenessState is unreachable, so it will try a reachable follower instead of the unreachable leader.
|
||||
rpcCtx, err = replicaSelector.next(s.bo)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
_, ok := replicaSelector.state.(*tryFollower)
|
||||
s.True(ok)
|
||||
s.NotEqual(regionStore.workTiKVIdx, replicaSelector.targetIdx)
|
||||
|
||||
// Do not try to use proxy if livenessState is unknown instead of unreachable.
|
||||
refreshEpochs(regionStore)
|
||||
cache.enableForwarding = true
|
||||
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
|
||||
return unknown
|
||||
}
|
||||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
s.Eventually(func() bool {
|
||||
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unknown
|
||||
}, 3*time.Second, 200*time.Millisecond)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
// Now, livenessState is unknown. Even if forwarding is enabled, it should try followers
|
||||
// instead of using the proxy.
|
||||
rpcCtx, err = replicaSelector.next(s.bo)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
_, ok = replicaSelector.state.(*tryFollower)
|
||||
s.True(ok)
|
||||
|
||||
// Test switching to tryNewProxy if leader is unreachable and forwarding is enabled
|
||||
refreshEpochs(regionStore)
|
||||
|
|
@ -396,20 +440,21 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
|
||||
return unreachable
|
||||
}
|
||||
s.Eventually(func() bool {
|
||||
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable
|
||||
}, 3*time.Second, 200*time.Millisecond)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
_, err = replicaSelector.next(s.bo)
|
||||
s.Nil(err)
|
||||
replicaSelector.onSendFailure(s.bo, nil)
|
||||
// Now, livenessState is unreachable, so it will try a new proxy instead of the leader.
|
||||
rpcCtx, err = replicaSelector.next(s.bo)
|
||||
s.NotNil(rpcCtx)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
state, ok := replicaSelector.state.(*tryNewProxy)
|
||||
s.True(ok)
|
||||
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
|
||||
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
|
||||
s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx)
|
||||
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 2)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 1)
|
||||
s.Equal(replicaSelector.proxyReplica().attempts, 1)
|
||||
|
||||
// When the current proxy node fails, it should try another one.
|
||||
|
|
@ -423,7 +468,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
|
||||
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
|
||||
s.NotEqual(lastProxy, replicaSelector.proxyIdx)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 3)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 2)
|
||||
s.Equal(replicaSelector.proxyReplica().attempts, 1)
|
||||
|
||||
// Test proxy store is saves when proxy is enabled
|
||||
|
|
@ -643,15 +688,21 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 3)
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
|
||||
|
||||
// The leader store is alive but can't provide service.
|
||||
// Region will be invalidated due to running out of all replicas.
|
||||
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
|
||||
return reachable
|
||||
}
|
||||
s.Eventually(func() bool {
|
||||
stores := s.regionRequestSender.replicaSelector.regionStore.stores
|
||||
return stores[0].getLivenessState() == reachable &&
|
||||
stores[1].getLivenessState() == reachable &&
|
||||
stores[2].getLivenessState() == reachable
|
||||
}, 3*time.Second, 200*time.Millisecond)
|
||||
// Region will be invalidated due to running out of all replicas.
|
||||
reloadRegion()
|
||||
s.cluster.StopStore(s.storeIDs[0])
|
||||
bo = retry.NewBackoffer(context.Background(), -1)
|
||||
|
|
|
|||
Loading…
Reference in New Issue