// Copyright 2021 TiKV Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // NOTE: The code in this file is based on code from the // TiDB project, licensed under the Apache License v 2.0 // // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/locate/region_request_test.go // // Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package locate import ( "context" "fmt" "strconv" "sync" "sync/atomic" "testing" "time" "unsafe" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" "github.com/tikv/client-go/v2/util/async" "go.uber.org/zap" ) func TestRegionRequestToThreeStores(t *testing.T) { suite.Run(t, new(testRegionRequestToThreeStoresSuite)) } type testRegionRequestToThreeStoresSuite struct { suite.Suite cluster *mocktikv.Cluster storeIDs []uint64 peerIDs []uint64 regionID uint64 leaderPeer uint64 cache *RegionCache bo *retry.Backoffer regionRequestSender *RegionRequestSender mvccStore mocktikv.MVCCStore onClosed func() } func (s *testRegionRequestToThreeStoresSuite) SetupTest() { s.mvccStore = mocktikv.MustNewMVCCStore() s.cluster = mocktikv.NewCluster(s.mvccStore) s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3) pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} s.cache = NewRegionCache(pdCli) s.bo = retry.NewNoopBackoff(context.Background()) client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client, oracle.NoopReadTSValidator{}) s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() if s.onClosed != nil { s.onClosed() } s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) region, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(region) oldStoreLimit := kv.StoreLimit.Load() kv.StoreLimit.Store(500) s.cache.stores.getOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500) // cause there is only one region in this cluster, regionID maps this leader. resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.NotNil(err) s.Nil(resp) e, ok := errors.Cause(err).(*tikverr.ErrTokenLimit) s.True(ok) s.Equal(e.StoreID, uint64(1)) kv.StoreLimit.Store(oldStoreLimit) } func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeader() { var leaderAddr string s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { if leaderAddr == "" { leaderAddr = addr } // Returns OK when switches to a different peer. if leaderAddr != addr { return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{}}, nil } return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{ RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}, }}, nil }} req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) bo := retry.NewBackofferWithVars(context.Background(), 5, nil) loc, err := s.cache.LocateKey(s.bo, []byte("key")) s.Nil(err) resp, _, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second) s.Nil(err) s.NotNil(resp) s.Nil(resp.GetRegionError()) } func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeaderErrorWithNewLeaderInfo() { cnt := 0 var location *KeyLocation cli := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { cnt++ switch cnt { case 1: region := s.cache.GetCachedRegionWithRLock(location.Region) s.NotNil(region) leaderPeerIdx := int(region.getStore().workTiKVIdx) peers := region.meta.Peers // return no leader with new leader info response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{ RegionId: req.RegionId, Leader: peers[(leaderPeerIdx+1)%len(peers)], }}, }} case 2: response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ Value: []byte("a"), }} default: return nil, fmt.Errorf("unexpected request") } return response, err }} req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) req.ReplicaReadType = kv.ReplicaReadLeader var err error location, err = s.cache.LocateKey(s.bo, []byte("a")) s.Nil(err) s.NotNil(location) bo := retry.NewBackoffer(context.Background(), 1000) resp, _, _, err := NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{}).SendReqCtx(bo, req, location.Region, time.Second, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) regionErr, err := resp.GetRegionError() s.Nil(err) s.Nil(regionErr) // It's unreasoneable to retry in upper layer, such as cop request, the upper layer will need to rebuild cop request and retry, there are some unnecessary overhead. s.Equal(cnt, 2) r := s.cache.GetCachedRegionWithRLock(location.Region) s.True(r.isValid()) } func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore() (*Store, string) { region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, []byte("a"), false) s.Nil(err) leaderStore, leaderPeer, _, _ := region.WorkStorePeer(region.getStore()) s.Equal(leaderPeer.Id, s.leaderPeer) leaderAddr, err := s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) s.Nil(err) return leaderStore, leaderAddr } func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { sender := NewRegionRequestSender(s.cache, s.regionRequestSender.client, oracle.NoopReadTSValidator{}) sender.regionCache.enableForwarding = true // First get the leader's addr from region cache leaderStore, leaderAddr := s.loadAndGetLeaderStore() bo := retry.NewBackoffer(context.Background(), 10000) // Simulate that the leader is network-partitioned but can be accessed by forwarding via a follower innerClient := sender.client sender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if addr == leaderAddr { return nil, errors.New("simulated rpc error") } // MockTiKV doesn't support forwarding. Simulate forwarding here. if len(req.ForwardedHost) != 0 { addr = req.ForwardedHost } return innerClient.SendRequest(ctx, addr, req, timeout) }} var storeState = uint32(unreachable) sender.regionCache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { if s.addr == leaderAddr { return livenessState(atomic.LoadUint32(&storeState)) } return reachable }) loc, err := sender.regionCache.LocateKey(bo, []byte("k")) s.Nil(err) s.Equal(loc.Region.GetID(), s.regionID) req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("k"), Value: []byte("v1"), }) resp, ctx, _, err := sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.Nil(err) regionErr, err := resp.GetRegionError() s.Nil(err) s.Nil(regionErr) s.Equal(resp.Resp.(*kvrpcpb.RawPutResponse).Error, "") s.Equal(ctx.Addr, leaderAddr) s.NotNil(ctx.ProxyStore) s.NotEqual(ctx.ProxyAddr, leaderAddr) s.Nil(err) // Simulate recovering to normal sender.client = innerClient atomic.StoreUint32(&storeState, uint32(reachable)) start := time.Now() for { if leaderStore.getLivenessState() == reachable { break } if time.Since(start) > 3*time.Second { s.FailNow("store didn't recover to normal in time") } time.Sleep(time.Millisecond * 200) } atomic.StoreUint32(&storeState, uint32(unreachable)) req = tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: []byte("k")}) resp, ctx, _, err = sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.Nil(err) regionErr, err = resp.GetRegionError() s.Nil(err) s.Nil(regionErr) s.Equal(resp.Resp.(*kvrpcpb.RawGetResponse).Value, []byte("v1")) s.Nil(ctx.ProxyStore) // Simulate server down sender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if addr == leaderAddr || req.ForwardedHost == leaderAddr { return nil, errors.New("simulated rpc error") } // MockTiKV doesn't support forwarding. Simulate forwarding here. if len(req.ForwardedHost) != 0 { addr = req.ForwardedHost } return innerClient.SendRequest(ctx, addr, req, timeout) }} // The leader is changed after a store is down. newLeaderPeerID := s.peerIDs[0] if newLeaderPeerID == s.leaderPeer { newLeaderPeerID = s.peerIDs[1] } s.NotEqual(newLeaderPeerID, s.leaderPeer) s.cluster.ChangeLeader(s.regionID, newLeaderPeerID) req = tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("k"), Value: []byte("v2"), }) resp, ctx, _, err = sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.Nil(err) regionErr, err = resp.GetRegionError() s.Nil(err) // After several retries, the region will be marked as needReload. // Then SendReqCtx will throw a pseudo EpochNotMatch to tell the caller to reload the region. s.NotNil(regionErr.EpochNotMatch) s.Nil(ctx) s.Equal(len(sender.failStoreIDs), 0) s.Equal(len(sender.failProxyStoreIDs), 0) region := sender.regionCache.GetCachedRegionWithRLock(loc.Region) s.NotNil(region) s.False(region.isValid()) loc, err = sender.regionCache.LocateKey(bo, []byte("k")) s.Nil(err) req = tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("k"), Value: []byte("v2"), }) resp, ctx, _, err = sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.Nil(err) regionErr, err = resp.GetRegionError() s.Nil(err) s.Nil(regionErr) s.Equal(resp.Resp.(*kvrpcpb.RawPutResponse).Error, "") // Leader changed s.NotEqual(ctx.Store.storeID, leaderStore.storeID) s.Nil(ctx.ProxyStore) } func refreshRegionTTL(region *Region) { atomic.StoreInt64(®ion.ttl, nextTTLWithoutJitter(time.Now().Unix())) atomic.StoreInt32((*int32)(®ion.invalidReason), int32(Ok)) } func refreshEpochs(regionStore *regionStore) { for i, store := range regionStore.stores { atomic.StoreUint32(&store.epoch, regionStore.storeEpochs[i]) } } func refreshLivenessStates(regionStore *regionStore) { for _, store := range regionStore.stores { atomic.StoreUint32(&store.livenessState, uint32(reachable)) } } func refreshStoreHealthStatus(regionStore *regionStore) { for _, store := range regionStore.stores { store.healthStatus.clientSideSlowScore.resetSlowScore() store.healthStatus.ResetTiKVServerSideSlowScoreForTest(50) store.healthStatus.updateSlowFlag() } } func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) { s.Equal(rpcCtx.Store, target.store) s.Equal(rpcCtx.Peer, target.peer) s.Equal(rpcCtx.Addr, target.store.addr) s.Equal(rpcCtx.AccessMode, tiKVOnly) if proxy != nil { s.Equal(rpcCtx.ProxyStore, proxy.store) s.Equal(rpcCtx.ProxyAddr, proxy.store.addr) } } func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(regionLoc) region := s.cache.GetCachedRegionWithRLock(regionLoc.Region) regionStore := region.getStore() req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{}) // Create a fake region and change its leader to the last peer. regionStore = regionStore.clone() regionStore.workTiKVIdx = AccessIndex(len(regionStore.stores) - 1) sidx, _ := regionStore.accessStore(tiKVOnly, regionStore.workTiKVIdx) regionStore.stores[sidx].epoch++ // Add a TiKV learner peer to the region. storeID := s.cluster.AllocID() s.cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID)) tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner} tikvLearnerAccessIdx := len(regionStore.stores) regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx) regionStore.stores = append(regionStore.stores, newUninitializedStore(tikvLearner.StoreId)) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ meta: region.GetMeta(), } refreshRegionTTL(region) region.meta.Peers = append(region.meta.Peers, tikvLearner) atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() cache.insertRegionToCache(region, true, true) cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. refreshRegionTTL(region) refreshEpochs(regionStore) req.ReplicaReadType = kv.ReplicaReadLearner replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) // Invalidate the region if the leader is not in the region. refreshRegionTTL(region) rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) target := replicaSelector.target AssertRPCCtxEqual(s, rpcCtx, target, nil) s.Equal(target.peer.Role, metapb.PeerRole_Learner) s.Equal(target.peer.Id, tikvLearner.Id) } func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(regionLoc) region := s.cache.GetCachedRegionWithRLock(regionLoc.Region) regionStore := region.getStore() req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{}) // Create a fake region and change its leader to the last peer. regionStore = regionStore.clone() regionStore.workTiKVIdx = AccessIndex(len(regionStore.stores) - 1) sidx, _ := regionStore.accessStore(tiKVOnly, regionStore.workTiKVIdx) regionStore.stores[sidx].epoch++ regionStore.storeEpochs[sidx]++ // Add a TiFlash peer to the region. tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()} regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores)) tiflashStore := newUninitializedStore(tiflash.StoreId) tiflashStore.storeType = tikvrpc.TiFlash regionStore.stores = append(regionStore.stores, tiflashStore) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ meta: region.GetMeta(), } refreshRegionTTL(region) region.meta.Peers = append(region.meta.Peers, tiflash) atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) // Disable the tick on health status. cache := NewRegionCache(s.cache.pdClient, RegionCacheNoHealthTick) defer cache.Close() cache.mu.Lock() cache.insertRegionToCache(region, true, true) cache.mu.Unlock() // Verify creating the replicaSelector. replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) s.Equal(replicaSelector.region, region) // Should only contain TiKV stores. s.Equal(len(replicaSelector.replicas), regionStore.accessStoreNum(tiKVOnly)) s.Equal(len(replicaSelector.replicas), len(regionStore.stores)-1) // Verify that the store matches the peer and epoch. for _, replica := range replicaSelector.replicas { s.Equal(replica.store.storeID, replica.peer.GetStoreId()) s.Equal(replica.peer, region.getPeerOnStore(replica.store.storeID)) s.True(replica.attempts == 0) for i, store := range regionStore.stores { if replica.store == store { s.Equal(replica.epoch, regionStore.storeEpochs[i]) } } } // Try the leader for maxReplicaAttempt times for i := 1; i <= maxReplicaAttempt; i++ { rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i) } // After that it should switch to tryFollower for i := 0; i < len(replicaSelector.replicas)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil) s.Equal(replicaSelector.target.attempts, 1) s.NotEqual(rpcCtx.Peer.Id, replicaSelector.replicas[regionStore.workTiKVIdx].peer.Id) } // If all replicas are tried, nil RPCContext should be returned rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) s.Nil(rpcCtx) // The region should be invalidated s.False(replicaSelector.region.isValid()) // Test switching to tryFollower if leader is unreachable refreshRegionTTL(region) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) unreachable.injectConstantLiveness(cache.stores) _, err = replicaSelector.next(s.bo, req) s.Nil(err) replicaSelector.onSendFailure(s.bo, nil) rpcCtx, err = replicaSelector.next(s.bo, req) s.NotNil(rpcCtx) s.Nil(err) s.NotEqual(replicaSelector.target.peer.Id, region.GetLeaderPeerID()) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil) s.Equal(replicaSelector.target.attempts, 1) // If the NotLeader errors provides an unreachable leader, do not switch to it. replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{ RegionId: region.GetID(), Leader: &metapb.Peer{Id: s.peerIDs[regionStore.workTiKVIdx], StoreId: s.storeIDs[regionStore.workTiKVIdx]}, }) // If the leader is unreachable and forwarding is not enabled, just do not try // the unreachable leader. refreshEpochs(regionStore) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) // Now, livenessState is unreachable, so it will try a reachable follower instead of the unreachable leader. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) s.NotNil(rpcCtx) s.NotEqual(regionStore.workTiKVIdx, replicaSelector.target.peer.Id, replicaSelector.replicas[regionStore.workTiKVIdx].peer.Id) // Do not try to use proxy if livenessState is unknown instead of unreachable. refreshEpochs(regionStore) cache.enableForwarding = true unknown.injectConstantLiveness(cache.stores) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unknown }, 3*time.Second, 200*time.Millisecond) // Now, livenessState is unknown. Even if forwarding is enabled, it should try followers // instead of using the proxy. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) s.NotNil(rpcCtx) // Test switching to tryNewProxy if leader is unreachable and forwarding is enabled refreshEpochs(regionStore) cache.enableForwarding = true replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) unreachable.injectConstantLiveness(cache.stores) s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable }, 3*time.Second, 200*time.Millisecond) // Now, livenessState is unreachable, so it will try a new proxy instead of the leader. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) s.NotNil(rpcCtx) s.NotNil(replicaSelector.target) s.NotNil(replicaSelector.proxy) s.NotEqual(replicaSelector.target.peer.Id, &replicaSelector.proxy.peer.Id) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy) s.Equal(replicaSelector.target.attempts, 1) s.Equal(replicaSelector.proxy.attempts, 1) // When the current proxy node fails, it should try another one. replicaSelector.onSendFailure(s.bo, nil) rpcCtx, err = replicaSelector.next(s.bo, req) s.NotNil(rpcCtx) s.Nil(err) s.Equal(replicaSelector.target.attempts, 2) s.Equal(replicaSelector.proxy.attempts, 1) // Test proxy store is saves when proxy is enabled replicaSelector.onSendSuccess(req) regionStore = region.getStore() s.Equal(replicaSelector.proxy.peer.Id, replicaSelector.replicas[regionStore.proxyTiKVIdx].peer.Id) // Test when proxyTiKVIdx is valid refreshEpochs(regionStore) cache.enableForwarding = true replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) _, err = replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy) // Test accessFollower state with kv.ReplicaReadFollower request type. req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil) refreshEpochs(regionStore) refreshLivenessStates(regionStore) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil) } // Fallback to the leader for 1 time rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // All replicas are exhausted. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(rpcCtx) s.Nil(err) // Test accessFollower state filtering epoch-stale stores. refreshRegionTTL(region) refreshEpochs(regionStore) // Mark all followers as stale. tiKVNum := regionStore.accessStoreNum(tiKVOnly) for i := 1; i < tiKVNum; i++ { regionStore.storeEpochs[(regionStore.workTiKVIdx+AccessIndex(i))%AccessIndex(tiKVNum)]++ } replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) // Should fallback to the leader immediately. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // Test accessFollower state filtering label-not-match stores. refreshRegionTTL(region) refreshEpochs(regionStore) refreshStoreHealthStatus(regionStore) labels := []*metapb.StoreLabel{ { Key: "a", Value: "b", }, } regionStore.workTiKVIdx = AccessIndex(0) accessIdx := AccessIndex(regionStore.accessStoreNum(tiKVOnly) - 1) _, store := regionStore.accessStore(tiKVOnly, accessIdx) store.labels = labels for i := 0; i < 5; i++ { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithMatchLabels(labels)) s.NotNil(replicaSelector) s.Nil(err) rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[accessIdx], nil) } // Test accessFollower state with leaderOnly option refreshRegionTTL(region) refreshEpochs(regionStore) for i := 0; i < 5; i++ { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithLeaderOnly()) s.NotNil(replicaSelector) s.Nil(err) rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) // Should always access the leader. AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) } // Test accessFollower state with kv.ReplicaReadMixed request type. refreshRegionTTL(region) refreshEpochs(regionStore) req.ReplicaReadType = kv.ReplicaReadMixed replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) // Invalidate the region if the leader is not in the region. refreshRegionTTL(region) replicaSelector.updateLeader(&metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}) s.False(region.isValid()) // Don't try next replica if the region is invalidated. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(rpcCtx) s.Nil(err) } // TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other // tests but it's a dedicated one to test sending requests with the replica selector. func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`)) defer func() { s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep")) }() req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(region) regionStore := s.cache.GetCachedRegionWithRLock(region.Region).getStore() s.NotNil(regionStore) reloadRegion := func() { s.regionRequestSender.replicaSelector.invalidateRegion() region, _ = s.cache.LocateRegionByID(s.bo, s.regionID) regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore() } hasFakeRegionError := func(resp *tikvrpc.Response) bool { if resp == nil { return false } regionErr, err := resp.GetRegionError() if err != nil { return false } return retry.IsFakeRegionError(regionErr) } // Normal bo := retry.NewBackoffer(context.Background(), -1) sender := s.regionRequestSender resp, _, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.True(bo.GetTotalBackoffTimes() == 0) // Switch to the next Peer due to store failure and the leader is on the next peer. bo = retry.NewBackoffer(context.Background(), -1) s.cluster.ChangeLeader(s.regionID, s.peerIDs[1]) s.cluster.StopStore(s.storeIDs[0]) resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.target.peer.Id, s.peerIDs[1]) s.True(bo.GetTotalBackoffTimes() == 1) s.cluster.StartStore(s.storeIDs[0]) atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) // Leader is updated because of send success, so no backoff. reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.target.peer.Id, s.peerIDs[1]) s.True(bo.GetTotalBackoffTimes() == 0) // Switch to the next peer due to leader failure but the new leader is not elected. // Region will be invalidated due to store epoch changed. reloadRegion() s.cluster.StopStore(s.storeIDs[1]) bo = retry.NewBackoffer(context.Background(), -1) resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.Equal(bo.GetTotalBackoffTimes(), 1) s.cluster.StartStore(s.storeIDs[1]) atomic.StoreUint32(®ionStore.stores[1].livenessState, uint32(reachable)) // Leader is changed. No backoff. reloadRegion() s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) bo = retry.NewBackoffer(context.Background(), -1) resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) s.Equal(bo.GetTotalBackoffTimes(), 0) // No leader. Backoff for each replica and runs out all replicas. s.cluster.GiveUpLeader(s.regionID) bo = retry.NewBackoffer(context.Background(), -1) resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.Equal(bo.GetTotalBackoffTimes(), 3) s.False(sender.replicaSelector.region.isValid()) s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) // The leader store is alive but can't provide service. reachable.injectConstantLiveness(s.cache.stores) s.Eventually(func() bool { stores := sender.replicaSelector.region.getStore().stores return stores[0].getLivenessState() == reachable && stores[1].getLivenessState() == reachable && stores[2].getLivenessState() == reachable }, 3*time.Second, 200*time.Millisecond) // Region will be invalidated due to running out of all replicas. reloadRegion() s.cluster.StopStore(s.storeIDs[0]) bo = retry.NewBackoffer(context.Background(), -1) resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) s.cluster.StartStore(s.storeIDs[0]) // Verify that retry the same replica when meets ServerIsBusy/MaxTimestampNotSynced/ReadIndexNotReady/ProposalInMergingMode. for _, regionErr := range []*errorpb.Error{ // ServerIsBusy takes too much time to test. // {ServerIsBusy: &errorpb.ServerIsBusy{}}, {MaxTimestampNotSynced: &errorpb.MaxTimestampNotSynced{}}, {ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}, {ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}, } { func() { oc := sender.client defer func() { sender.client = oc }() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { // Return the specific region error when accesses the leader. if addr == s.cluster.GetStore(s.storeIDs[0]).Address { return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{RegionError: regionErr}}, nil } // Return the not leader error when accesses followers. return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{RegionError: &errorpb.Error{ NotLeader: &errorpb.NotLeader{ RegionId: region.Region.id, Leader: &metapb.Peer{Id: s.peerIDs[0], StoreId: s.storeIDs[0]}, }}}}, nil }} reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) }() } // Verify switch to the next peer immediately when meets StaleCommand. reloadRegion() func() { oc := sender.client defer func() { sender.client = oc }() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{RegionError: &errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}}}, nil }} reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() // Verify don't invalidate region when meets unknown region errors. reloadRegion() func() { oc := sender.client defer func() { sender.client = oc }() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{RegionError: &errorpb.Error{Message: ""}}}, nil }} reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() // Verify invalidate region when meets StoreNotMatch/RegionNotFound/EpochNotMatch/NotLeader and can't find the leader in region. for i, regionErr := range []*errorpb.Error{ {StoreNotMatch: &errorpb.StoreNotMatch{}}, {RegionNotFound: &errorpb.RegionNotFound{}}, {EpochNotMatch: &errorpb.EpochNotMatch{}}, {NotLeader: &errorpb.NotLeader{Leader: &metapb.Peer{}}}} { func() { oc := sender.client defer func() { sender.client = oc }() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{RegionError: regionErr}}, nil }} reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) // Return a sendError when meets NotLeader and can't find the leader in the region. if i == 3 { s.Nil(err) s.True(hasFakeRegionError(resp)) } else { s.Nil(err) s.NotNil(resp) regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() } // Runs out of all replicas and then returns a send error. unreachable.injectConstantLiveness(s.cache.stores) reloadRegion() for _, store := range s.storeIDs { s.cluster.StopStore(store) } bo = retry.NewBackoffer(context.Background(), -1) resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) s.True(bo.GetTotalBackoffTimes() == 3) s.False(sender.replicaSelector.region.isValid()) for _, store := range s.storeIDs { s.cluster.StartStore(store) } } func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(regionLoc) region := s.cache.GetCachedRegionWithRLock(regionLoc.Region) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{ BusyThresholdMs: 50, }) replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) s.Equal(replicaSelector.region, region) // The busyThreshold in replicaSelector should be initialized with the request context. s.Equal(replicaSelector.busyThreshold, 50*time.Millisecond) bo := retry.NewBackoffer(context.Background(), -1) rpcCtx, err := replicaSelector.next(bo, req) s.Nil(err) s.Equal(rpcCtx.Peer.Id, s.leaderPeer) s.False(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(50)) // Receive a ServerIsBusy error replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 500, }) rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer) s.True(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(50)) lastPeerID := rpcCtx.Peer.Id replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 800, }) rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) // Should choose a peer different from before s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer) s.NotEqual(rpcCtx.Peer.Id, lastPeerID) s.True(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(50)) // All peers are too busy replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 150, }) lessBusyPeer := rpcCtx.Peer.Id // Then, send to the leader again with no threshold. rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) s.Equal(rpcCtx.Peer.Id, s.leaderPeer) s.False(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(0)) s.True(replicaSelector.region.isValid()) // don't invalidate region when can't find an idle replica. time.Sleep(120 * time.Millisecond) // When there comes a new request, it should skip busy leader and choose a less busy store req.BusyThresholdMs = 50 replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) s.Equal(rpcCtx.Peer.Id, lessBusyPeer) s.True(req.ReplicaRead) } func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadWithFlashbackInProgress() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(regionLoc) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { // Return serverIsBusy when accesses the leader with busy threshold. if addr == s.cluster.GetStore(s.storeIDs[0]).Address { if req.BusyThresholdMs > 0 { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 500}, }}}, nil } else { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil } } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ FlashbackInProgress: &errorpb.FlashbackInProgress{ RegionId: regionLoc.Region.GetID(), }, }}}, nil }} reqs := []*tikvrpc.Request{ tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kvrpcpb.Context{ BusyThresholdMs: 50, }), tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadFollower, nil), } for _, req := range reqs { bo := retry.NewBackoffer(context.Background(), -1) s.Nil(err) resp, retry, err := s.regionRequestSender.SendReq(bo, req, regionLoc.Region, time.Second) s.Nil(err) s.GreaterOrEqual(retry, 1) s.Equal(resp.Resp.(*kvrpcpb.GetResponse).Value, []byte("value")) } } func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() { var leaderAddr string s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { // Returns error when accesses non-leader. if leaderAddr != addr { return nil, context.DeadlineExceeded } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ Value: []byte("value"), }}, nil }} req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: []byte("key"), }) req.ReplicaReadType = kv.ReplicaReadMixed loc, err := s.cache.LocateKey(s.bo, []byte("key")) s.Nil(err) region := s.cache.GetCachedRegionWithRLock(loc.Region) s.NotNil(region) regionStore := region.getStore() leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr s.NotEqual(leaderAddr, "") for i := 0; i < 30; i++ { bo := retry.NewBackofferWithVars(context.Background(), 100, nil) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) // Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale. allFollowerStoreEpochStale := true for i, store := range regionStore.stores { if i == int(regionStore.workTiKVIdx) { continue } if store.epoch == regionStore.storeEpochs[i] { allFollowerStoreEpochStale = false break } else { s.Equal(store.getLivenessState(), unreachable) } } if allFollowerStoreEpochStale { break } } // mock for GC leader reload all regions. bo := retry.NewBackofferWithVars(context.Background(), 10, nil) _, err = s.cache.BatchLoadRegionsWithKeyRange(bo, []byte(""), nil, 1) s.Nil(err) loc, err = s.cache.LocateKey(s.bo, []byte("key")) s.Nil(err) region = s.cache.GetCachedRegionWithRLock(loc.Region) s.NotNil(region) regionStore = region.getStore() for i, store := range regionStore.stores { if i == int(regionStore.workTiKVIdx) { continue } // After reload region, the region epoch will be updated, but the store liveness state is still unreachable. s.Equal(store.epoch, regionStore.storeEpochs[i]) s.Equal(store.getLivenessState(), unreachable) } for i := 0; i < 100; i++ { bo := retry.NewBackofferWithVars(context.Background(), 1, nil) resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0. s.Equal(0, bo.GetTotalBackoffTimes()) s.Equal(0, retryTimes) } } func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { leaderAddr := "" reqTargetAddrs := make(map[string]struct{}) s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() bo := retry.NewBackoffer(context.Background(), 10000) mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { reqTargetAddrs[addr] = struct{}{} if req.Context.MaxExecutionDurationMs < 10 { return nil, context.DeadlineExceeded } if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil }} getLocFn := func() *KeyLocation { loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) s.Nil(err) region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) s.Nil(err) return loc } resetStats := func() { reqTargetAddrs = make(map[string]struct{}) s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient, oracle.NoopReadTSValidator{}) s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() } //Test different read type. staleReadTypes := []bool{false, true} replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed} for _, staleRead := range staleReadTypes { for _, tp := range replicaReadTypes { log.Info("TestSendReqFirstTimeout", zap.Bool("stale-read", staleRead), zap.String("replica-read-type", tp.String())) resetStats() req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) if staleRead { req.EnableStaleWithMixedReplicaRead() } else { req.ReplicaRead = tp.IsFollowerRead() req.ReplicaReadType = tp } loc := getLocFn() resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) s.Nil(err) regionErr, err := resp.GetRegionError() s.Nil(err) s.True(retry.IsFakeRegionError(regionErr)) s.Equal(1, s.regionRequestSender.Stats.GetRPCStatsCount()) s.Equal(uint32(3), s.regionRequestSender.Stats.GetCmdRPCCount(tikvrpc.CmdGet)) // 3 rpc s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. // warn: must rest MaxExecutionDurationMs before retry. resetStats() if staleRead { req.EnableStaleWithMixedReplicaRead() } else { req.ReplicaRead = tp.IsFollowerRead() req.ReplicaReadType = tp } req.Context.MaxExecutionDurationMs = 0 resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.Nil(err) regionErr, err = resp.GetRegionError() s.Nil(err) s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) s.Equal(1, s.regionRequestSender.Stats.GetRPCStatsCount()) s.Equal(uint32(1), s.regionRequestSender.Stats.GetCmdRPCCount(tikvrpc.CmdGet)) // 1 rpc s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } } // Test for write request. reachable.injectConstantLiveness(s.cache.stores) resetStats() req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) req.ReplicaReadType = kv.ReplicaReadLeader loc := getLocFn() bo = retry.NewBackoffer(context.Background(), 1000) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) s.Nil(resp) s.Equal(context.DeadlineExceeded, err) backoffTimes := bo.GetBackoffTimes() s.True(backoffTimes["tikvRPC"] > 0) // write request timeout won't do fast retry, so backoff times should be more than 0. } func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderRegionError() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(regionLoc) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { select { case <-ctx.Done(): return nil, errors.New("timeout") default: } // Return `mismatch peer id` when accesses the leader. if addr == s.cluster.GetStore(s.storeIDs[0]).Address { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ MismatchPeerId: &errorpb.MismatchPeerId{ RequestPeerId: 1, StorePeerId: 2, }, }}}, nil } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ DataIsNotReady: &errorpb.DataIsNotReady{}, }}}, nil }} region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID()) s.True(region.isValid()) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() req.ReplicaReadType = kv.ReplicaReadFollower ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() bo := retry.NewBackoffer(ctx, -1) s.Nil(err) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV) s.Nil(err) regionErr, err := resp.GetRegionError() s.Nil(err) s.Equal(regionErrorToLabel(regionErr), "mismatch_peer_id") // return non-epoch-not-match region error and the upper layer can auto retry. s.Nil(regionErr.GetEpochNotMatch()) // after region error returned, the region should be invalidated. s.False(region.isValid()) } func (s *testRegionRequestToThreeStoresSuite) TestLogging() { req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: []byte("key"), }) region, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(region) oc := s.regionRequestSender.client defer func() { s.regionRequestSender.client = oc }() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}, }} return response, nil }} bo := retry.NewBackofferWithVars(context.Background(), 5, nil) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { leaderStore, _ := s.loadAndGetLeaderStore() regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: []byte("key"), }) req.InputRequestSource = "test" setReadType := func(req *tikvrpc.Request, readType string) { req.StaleRead = false req.ReplicaRead = false switch readType { case "leader": return case "follower": req.ReplicaRead = true req.ReplicaReadType = kv.ReplicaReadFollower case "stale_follower", "stale_leader": req.EnableStaleWithMixedReplicaRead() default: panic("unreachable") } } setTargetReplica := func(selector *replicaSelector, readType string) { var leader bool switch readType { case "leader", "stale_leader": leader = true case "follower", "stale_follower": leader = false default: panic("unreachable") } for _, replica := range selector.replicas { if replica.store.storeID == leaderStore.storeID && leader { selector.target = replica return } if replica.store.storeID != leaderStore.storeID && !leader { selector.target = replica return } } panic("unreachable") } firstReadReplicas := []string{"leader", "follower", "stale_follower", "stale_leader"} retryReadReplicas := []string{"leader", "follower"} for _, firstReplica := range firstReadReplicas { for _, retryReplica := range retryReadReplicas { bo := retry.NewBackoffer(context.Background(), -1) req.IsRetryRequest = false setReadType(req, firstReplica) replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req) s.Nil(err) setTargetReplica(replicaSelector, firstReplica) rpcCtx, err := replicaSelector.buildRPCContext(bo, replicaSelector.target, replicaSelector.proxy) s.Nil(err) s.NotNil(rpcCtx) patchRequestSource(req, replicaSelector.replicaType()) s.Equal(firstReplica+"_test", req.RequestSource) // retry setReadType(req, retryReplica) replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req) s.Nil(err) setTargetReplica(replicaSelector, retryReplica) rpcCtx, err = replicaSelector.buildRPCContext(bo, replicaSelector.target, replicaSelector.proxy) s.Nil(err) s.NotNil(rpcCtx) req.IsRetryRequest = true patchRequestSource(req, replicaSelector.replicaType()) s.Equal("retry_"+firstReplica+"_"+retryReplica+"_test", req.RequestSource) } } } func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeout() { var ( leaderAddr string leaderLabel []*metapb.StoreLabel ) bo := retry.NewBackoffer(context.Background(), 10000) mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if addr == leaderAddr { return nil, context.DeadlineExceeded } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil }} s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient, oracle.NoopReadTSValidator{}) s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() getLocFn := func() *KeyLocation { loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) s.Nil(err) region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) s.Nil(err) leaderLabel = []*metapb.StoreLabel{{Key: "id", Value: strconv.FormatUint(leaderStore.StoreID(), 10)}} return loc } req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) req.EnableStaleWithMixedReplicaRead() loc := getLocFn() var ops []StoreSelectorOption ops = append(ops, WithMatchLabels(leaderLabel)) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, ops...) s.Nil(err) regionErr, err := resp.GetRegionError() s.Nil(err) s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) s.Equal(1, s.regionRequestSender.Stats.GetRPCStatsCount()) s.Equal(uint32(2), s.regionRequestSender.Stats.GetCmdRPCCount(tikvrpc.CmdGet)) // 2 rpc s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { key := []byte("key") region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) s.Nil(err) regionStore := region.getStore() leader, _, _, _ := region.WorkStorePeer(regionStore) follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ Value: []byte(addr), }}, nil }} atomic.StoreUint32(&leader.livenessState, uint32(unreachable)) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() bo := retry.NewBackoffer(context.Background(), -1) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) s.Nil(err) // `tryFollower` always try the local peer firstly s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } func (s *testRegionRequestToThreeStoresSuite) TestPreferLeader() { key := []byte("key") bo := retry.NewBackoffer(context.Background(), -1) // load region into cache loc, err := s.cache.LocateKey(bo, key) s.Require().NoError(err) region := s.cache.GetCachedRegionWithRLock(loc.Region) leader, _, _, _ := region.WorkStorePeer(region.getStore()) // make request req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadPreferLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope // setup mock client s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { val := "follower" if addr == leader.addr { val = "leader" } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(val)}}, nil }} // access leader when all peers are reachable resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.NoError(err) regionErr, err := resp.GetRegionError() s.NoError(err) s.Nil(regionErr) s.Equal("leader", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) // access follower when leader is unreachable atomic.StoreUint32(&leader.livenessState, uint32(unreachable)) resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.NoError(err) regionErr, err = resp.GetRegionError() s.NoError(err) s.Nil(regionErr) s.Equal("follower", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) // access the rest follower when leader and one follower are unreachable follower, _, _, _ := region.FollowerStorePeer(region.getStore(), 0, &storeSelectorOp{}) atomic.StoreUint32(&follower.livenessState, uint32(unreachable)) resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.NoError(err) regionErr, err = resp.GetRegionError() s.NoError(err) s.Nil(regionErr) s.Equal("follower", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) // return fake error when all peers are unreachable follower, _, _, _ = region.FollowerStorePeer(region.getStore(), 1, &storeSelectorOp{}) atomic.StoreUint32(&follower.livenessState, uint32(unreachable)) resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) s.NoError(err) regionErr, err = resp.GetRegionError() s.NoError(err) s.True(retry.IsFakeRegionError(regionErr)) } func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { key := []byte("key") value := []byte("value1") s.NoError(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`)) defer func() { s.NoError(failpoint.Disable("tikvclient/injectLiveness")) }() region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) s.Nil(err) regionStore := region.getStore() oldLeader, oldLeaderPeer, _, _ := region.WorkStorePeer(regionStore) // The follower will become the new leader later follower, followerPeer, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) currLeader := struct { sync.Mutex addr string peer *metapb.Peer }{ addr: oldLeader.addr, peer: oldLeaderPeer, } requestHandled := false s.regionRequestSender.client = &fnClient{ fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if addr == oldLeader.addr { time.Sleep(timeout) return nil, context.DeadlineExceeded } currLeader.Lock() leaderAddr := currLeader.addr leaderPeer := currLeader.peer currLeader.Unlock() if addr != leaderAddr { return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{ RegionId: region.GetID(), Leader: leaderPeer, }}}}, nil } requestHandled = true return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil }, } // Simulate the attempted time is nearly reached so that the test won't take too much time to run. // But the `replicaSelector` of the request sender is not initialized yet before sending any request. // So try to control it by using a failpoint. s.NoError(failpoint.Enable("tikvclient/newReplicaSelectorInitialAttemptedTime", fmt.Sprintf(`return("%s")`, (maxReplicaAttemptTime-time.Second).String()))) defer func() { s.NoError(failpoint.Disable("tikvclient/newReplicaSelectorInitialAttemptedTime")) }() resCh := make(chan struct { resp *tikvrpc.Response err error }) startTime := time.Now() go func() { bo := retry.NewBackoffer(context.Background(), -1) req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{ Mutations: []*kvrpcpb.Mutation{{ Op: kvrpcpb.Op_Put, Key: key, Value: value, }}, StartVersion: 100, }) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second*2, tikvrpc.TiKV) resCh <- struct { resp *tikvrpc.Response err error }{resp: resp, err: err} }() select { case res := <-resCh: s.Fail("request finished too early", fmt.Sprintf("resp: %s, error: %+q", res.resp, res.err)) case <-time.After(time.Millisecond * 200): } s.cluster.ChangeLeader(region.GetID(), followerPeer.GetId()) currLeader.Lock() currLeader.addr = follower.addr currLeader.peer = followerPeer currLeader.Unlock() res := <-resCh elapsed := time.Since(startTime) s.NoError(res.err) s.Nil(res.resp.GetRegionError()) s.IsType(&kvrpcpb.PrewriteResponse{}, res.resp.Resp) s.Less(elapsed, time.Millisecond*2500) s.True(requestHandled) } func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { s.onClosed = func() { SetRegionCacheTTLWithJitter(600, 60) } SetRegionCacheTTLWithJitter(2, 0) bo := retry.NewBackoffer(context.Background(), -1) key := []byte("key") req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope downStore := s.cluster.GetStore(s.storeIDs[2]) s.cluster.MarkPeerDown(s.peerIDs[2]) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { s.Require().NotEqual(addr, downStore.Address) return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil }} for i := 0; i < 15; i++ { time.Sleep(200 * time.Millisecond) loc, err := s.cache.LocateKey(bo, key) s.Require().Nil(err) resp, rpcCtx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(downStore.Labels)) s.Require().Nil(err) s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value), "should access other peers") } s.cluster.RemoveDownPeer(s.peerIDs[2]) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil }} for i := 0; i < 15; i++ { time.Sleep(200 * time.Millisecond) loc, err := s.cache.LocateKey(bo, key) s.Require().Nil(err) _, rpcCtx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(downStore.Labels)) s.Require().Nil(err) if rpcCtx.Addr == downStore.Address { return } } s.Require().Fail("should access recovered peer after region reloading within RegionCacheTTL") } func (s *testRegionRequestToThreeStoresSuite) TestStaleReadMetrics() { readMetric := func(col prometheus.Collector) int { ch := make(chan prometheus.Metric, 1) col.Collect(ch) var m dto.Metric s.Nil((<-ch).Write(&m)) return int(*m.Counter.Value + 0.000001) // round to int and avoid floating point precision issues } // set the "zone" label for all stores. for _, storeID := range s.storeIDs { s.cluster.UpdateStoreLabels(storeID, []*metapb.StoreLabel{ { Key: "zone", Value: fmt.Sprintf("zone%d", storeID), }, }) } // set the global config zone label to the first store's zone. localZone := fmt.Sprintf("zone%d", s.storeIDs[0]) var oldZoneLabel string config.UpdateGlobal(func(cfg *config.Config) { oldZoneLabel = cfg.ZoneLabel cfg.ZoneLabel = localZone }) defer func() { config.UpdateGlobal(func(cfg *config.Config) { cfg.ZoneLabel = oldZoneLabel }) }() for _, staleReadHit := range []bool{false, true} { for _, asyncReq := range []bool{false, true} { caseName := fmt.Sprintf("async=%t, staleReadHit=%t", asyncReq, staleReadHit) // Delete all vectors and recreate them before each test case. metrics.TiKVStaleReadCounter.Reset() metrics.TiKVStaleReadReqCounter.Reset() metrics.TiKVStaleReadBytes.Reset() metrics.StaleReadHitCounter = metrics.TiKVStaleReadCounter.WithLabelValues("hit") metrics.StaleReadMissCounter = metrics.TiKVStaleReadCounter.WithLabelValues("miss") metrics.StaleReadReqLocalCounter = metrics.TiKVStaleReadReqCounter.WithLabelValues("local") metrics.StaleReadReqCrossZoneCounter = metrics.TiKVStaleReadReqCounter.WithLabelValues("cross-zone") metrics.StaleReadLocalInBytes = metrics.TiKVStaleReadBytes.WithLabelValues("local", "in") metrics.StaleReadLocalOutBytes = metrics.TiKVStaleReadBytes.WithLabelValues("local", "out") metrics.StaleReadRemoteInBytes = metrics.TiKVStaleReadBytes.WithLabelValues("cross-zone", "in") metrics.StaleReadRemoteOutBytes = metrics.TiKVStaleReadBytes.WithLabelValues("cross-zone", "out") key := []byte("key") value := []byte("value") s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) { if req.StaleRead && !staleReadHit { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ DataIsNotReady: &errorpb.DataIsNotReady{}, }}}, nil } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil }} req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() ctx := context.WithValue(context.Background(), util.ExecDetailsKey, &util.ExecDetails{}) bo := retry.NewBackoffer(ctx, -1) loc, err := s.cache.LocateKey(bo, key) s.Require().Nil(err) var resp *tikvrpc.Response if asyncReq { complete := false rl := async.NewRunLoop() s.regionRequestSender.SendReqAsync(bo, req, loc.Region, time.Second, async.NewCallback(rl, func(innerResp *tikvrpc.ResponseExt, innerErr error) { resp, err = &innerResp.Response, innerErr complete = true }), WithMatchLabels(s.cluster.GetStore(s.storeIDs[0]).Labels)) for !complete { _, err := rl.Exec(ctx) s.Nil(err) } } else { resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(s.cluster.GetStore(s.storeIDs[0]).Labels)) } s.Require().Nil(err) s.Equal(value, resp.Resp.(*kvrpcpb.GetResponse).Value) hits, misses := readMetric(metrics.StaleReadHitCounter), readMetric(metrics.StaleReadMissCounter) localReq, remoteReq := readMetric(metrics.StaleReadReqLocalCounter), readMetric(metrics.StaleReadReqCrossZoneCounter) localInBytes, localOutBytes := readMetric(metrics.StaleReadLocalInBytes), readMetric(metrics.StaleReadLocalOutBytes) remoteInBytes, remoteOutBytes := readMetric(metrics.StaleReadRemoteInBytes), readMetric(metrics.StaleReadRemoteOutBytes) if staleReadHit { // when stale read hitting // local metrics should be counted s.Equal(1, hits, caseName) s.Equal(1, localReq, caseName) s.Greater(localInBytes, 0, caseName) s.Greater(localOutBytes, 0, caseName) // remote metrics should be zero s.Zero(misses, caseName) s.Zero(remoteReq, caseName) s.Zero(remoteInBytes, caseName) s.Zero(remoteOutBytes, caseName) } else { // when stale read missing s.Zero(hits, caseName) // local replica is tried first, so local metrics will also be counted s.Greater(localReq, 0, caseName) s.Greater(localInBytes, 0, caseName) s.Greater(localOutBytes, 0, caseName) // remote metrics should be counted s.Equal(1, misses, caseName) s.Equal(1, remoteReq, caseName) s.Greater(remoteInBytes, 0, caseName) s.Greater(remoteInBytes, 0, caseName) } } } }