From 1ea3d66418d048cfa69beb9a099773d8ebc6b310 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 22 Aug 2023 14:45:13 +0800 Subject: [PATCH 1/5] use leader read when `tryFollower` is fallback from `accessKnownLeader` (#952) * fix tryFollower Signed-off-by: you06 * address comment Signed-off-by: you06 --------- Signed-off-by: you06 --- internal/locate/region_request.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 47fac7b0..96d259aa 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -347,7 +347,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} return nil, stateChanged{} } if selector.busyThreshold > 0 { @@ -371,7 +371,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep return } if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } if liveness != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -379,7 +379,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } // tryFollower is the state where we cannot access the known leader @@ -393,9 +393,9 @@ type tryFollower struct { stateBase leaderIdx AccessIndex lastIdx AccessIndex - // fromOnNotLeader indicates whether the state is changed from onNotLeader. - fromOnNotLeader bool - labels []*metapb.StoreLabel + // fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`. + fromAccessKnownLeader bool + labels []*metapb.StoreLabel } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -454,7 +454,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if err != nil || rpcCtx == nil { return rpcCtx, err } - if !state.fromOnNotLeader { + if !state.fromAccessKnownLeader { replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } @@ -464,7 +464,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if state.fromOnNotLeader { + if state.fromAccessKnownLeader { peer := selector.targetReplica().peer if !selector.region.switchWorkLeaderToPeer(peer) { logutil.BgLogger().Warn("the store must exist", From ff39b4af09f180ff5034fdc36dbcb4a90d14eab2 Mon Sep 17 00:00:00 2001 From: buffer Date: Tue, 22 Aug 2023 17:25:20 +0800 Subject: [PATCH 2/5] error: handle bucket version not match (#918) --- go.mod | 2 +- go.sum | 4 +-- integration_tests/go.mod | 2 +- integration_tests/go.sum | 4 +-- internal/locate/region_cache.go | 41 +++++++++++++++++++++------- internal/locate/region_cache_test.go | 9 ++++++ internal/locate/region_request.go | 11 ++++++++ 7 files changed, 57 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index eedb3276..7fd8fd58 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 + github.com/pingcap/kvproto v0.0.0-20230818065851-7b612d935bf9 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.1 diff --git a/go.sum b/go.sum index 2df15720..570bd743 100644 --- a/go.sum +++ b/go.sum @@ -136,8 +136,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM= -github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230818065851-7b612d935bf9 h1:VDoZ18CAXoTUNTCxfl4BjQSD5rJQri8QlH8nu0ZuHeg= +github.com/pingcap/kvproto v0.0.0-20230818065851-7b612d935bf9/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 97636a2f..01e508d6 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 + github.com/pingcap/kvproto v0.0.0-20230818065851-7b612d935bf9 github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index ac10fa09..134ec393 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -363,8 +363,8 @@ github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM= -github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230818065851-7b612d935bf9 h1:VDoZ18CAXoTUNTCxfl4BjQSD5rJQri8QlH8nu0ZuHeg= +github.com/pingcap/kvproto v0.0.0-20230818065851-7b612d935bf9/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index cf57b521..56444ac0 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -627,16 +627,17 @@ func (c *RegionCache) SetPDClient(client pd.Client) { // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { - Region RegionVerID - Meta *metapb.Region - Peer *metapb.Peer - AccessIdx AccessIndex - Store *Store - Addr string - AccessMode accessMode - ProxyStore *Store // nil means proxy is not used - ProxyAddr string // valid when ProxyStore is not nil - TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. + Region RegionVerID + Meta *metapb.Region + Peer *metapb.Peer + AccessIdx AccessIndex + Store *Store + Addr string + AccessMode accessMode + ProxyStore *Store // nil means proxy is not used + ProxyAddr string // valid when ProxyStore is not nil + TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. + BucketVersion uint64 contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden } @@ -1947,6 +1948,26 @@ func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store { return s } +// OnBucketVersionNotMatch removes the old buckets meta if the version is stale. +func (c *RegionCache) OnBucketVersionNotMatch(ctx *RPCContext, version uint64, keys [][]byte) { + r := c.GetCachedRegionWithRLock(ctx.Region) + if r == nil { + return + } + + buckets := r.getStore().buckets + if buckets == nil || buckets.GetVersion() < version { + oldStore := r.getStore() + store := oldStore.clone() + store.buckets = &metapb.Buckets{ + Version: version, + Keys: keys, + RegionId: r.meta.GetId(), + } + r.compareAndSwapStore(oldStore, store) + } +} + // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. // It returns whether retries the request because it's possible the region epoch is ahead of TiKV's due to slow appling. func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) (bool, error) { diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 619da2d2..6226a1c6 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1646,6 +1646,15 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() { shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackNotPrepared: &errorpb.FlashbackNotPrepared{}}) s.Error(err) s.False(shouldRetry) + + shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{BucketVersionNotMatch: &errorpb.BucketVersionNotMatch{Keys: [][]byte{[]byte("a")}, Version: 1}}) + s.Nil(err) + s.False(shouldRetry) + ctx.Region.GetID() + key, err := s.cache.LocateKey(s.bo, []byte("a")) + s.Nil(err) + s.Equal(key.Buckets.Keys, [][]byte{[]byte("a")}) + s.Equal(key.Buckets.Version, uint64(1)) } func (s *testRegionCacheSuite) TestBackgroundCacheGC() { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 96d259aa..3bf19cb9 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -2038,6 +2038,17 @@ func (s *RegionRequestSender) onRegionError( return retry, err } + if bucketVersionNotMatch := regionErr.GetBucketVersionNotMatch(); bucketVersionNotMatch != nil { + logutil.Logger(bo.GetCtx()).Debug( + "tikv reports `BucketVersionNotMatch` retry later", + zap.Stringer("bucketVersionNotMatch", bucketVersionNotMatch), + zap.Stringer("ctx", ctx), + ) + // bucket version is not match, we should split this cop request again. + s.regionCache.OnBucketVersionNotMatch(ctx, bucketVersionNotMatch.Version, bucketVersionNotMatch.Keys) + return false, nil + } + if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { s.replicaSelector.onDeadlineExceeded() From fc88757771f9838d8606553873444b896e016000 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Thu, 24 Aug 2023 16:00:22 +0800 Subject: [PATCH 3/5] log: group replica selector logging and split not leader errors (#929) * group replica selector logging and split not leader errors Signed-off-by: cfzjywxk * use atomic to read epoch Signed-off-by: cfzjywxk --------- Signed-off-by: cfzjywxk --- internal/locate/region_request.go | 134 +++++++++++++++--------- internal/locate/region_request3_test.go | 28 +++++ 2 files changed, 112 insertions(+), 50 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 3bf19cb9..c22f782e 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -112,6 +112,10 @@ type RegionRequestSender struct { RegionRequestRuntimeStats } +func (s *RegionRequestSender) String() string { + return fmt.Sprintf("{replicaSelector: %v}", s.replicaSelector.String()) +} + // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { Stats map[tikvrpc.CmdType]*RPCRuntimeStats @@ -249,6 +253,10 @@ type replica struct { deadlineErrUsingConfTimeout bool } +func (r *replica) getEpoch() uint32 { + return atomic.LoadUint32(&r.epoch) +} + func (r *replica) isEpochStale() bool { return r.epoch != atomic.LoadUint32(&r.store.epoch) } @@ -273,6 +281,64 @@ type replicaSelector struct { busyThreshold time.Duration } +func selectorStateToString(state selectorState) string { + replicaSelectorState := "nil" + if state != nil { + switch state.(type) { + case *accessKnownLeader: + replicaSelectorState = "accessKnownLeader" + case *accessFollower: + replicaSelectorState = "accessFollower" + case *accessByKnownProxy: + replicaSelectorState = "accessByKnownProxy" + case *tryFollower: + replicaSelectorState = "tryFollower" + case *tryNewProxy: + replicaSelectorState = "tryNewProxy" + case *invalidLeader: + replicaSelectorState = "invalidLeader" + case *invalidStore: + replicaSelectorState = "invalidStore" + case *stateBase: + replicaSelectorState = "stateBase" + case nil: + replicaSelectorState = "nil" + } + } + return replicaSelectorState +} + +func (s *replicaSelector) String() string { + var replicaStatus []string + cacheRegionIsValid := "unknown" + selectorStateStr := "nil" + if s != nil { + selectorStateStr = selectorStateToString(s.state) + if s.region != nil { + if s.region.isValid() { + cacheRegionIsValid = "true" + } else { + cacheRegionIsValid = "false" + } + } + for _, replica := range s.replicas { + replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, "+ + "attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + replica.peer.GetId(), + replica.store.storeID, + replica.isEpochStale(), + replica.attempts, + replica.getEpoch(), + atomic.LoadUint32(&replica.store.epoch), + replica.store.getResolveState(), + replica.store.getLivenessState(), + )) + } + } + + return fmt.Sprintf("replicaSelector{selectorStateStr: %v, cacheRegionIsValid: %v, replicaStatus: %v}", selectorStateStr, cacheRegionIsValid, replicaStatus) +} + // selectorState is the interface of states of the replicaSelector. // Here is the main state transition diagram: // @@ -1401,8 +1467,8 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, retryTimes, err } if regionErr != nil { - regionErrLabel := regionErrorToLabel(regionErr) - totalErrors[regionErrLabel]++ + regionErrLogging := regionErrorToLogging(rpcCtx.Peer.GetId(), regionErr) + totalErrors[regionErrLogging]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) @@ -1427,50 +1493,6 @@ func (s *RegionRequestSender) SendReqCtx( } func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { - var replicaStatus []string - replicaSelectorState := "nil" - cacheRegionIsValid := "unknown" - if s.replicaSelector != nil { - switch s.replicaSelector.state.(type) { - case *accessKnownLeader: - replicaSelectorState = "accessKnownLeader" - case *accessFollower: - replicaSelectorState = "accessFollower" - case *accessByKnownProxy: - replicaSelectorState = "accessByKnownProxy" - case *tryFollower: - replicaSelectorState = "tryFollower" - case *tryNewProxy: - replicaSelectorState = "tryNewProxy" - case *invalidLeader: - replicaSelectorState = "invalidLeader" - case *invalidStore: - replicaSelectorState = "invalidStore" - case *stateBase: - replicaSelectorState = "stateBase" - case nil: - replicaSelectorState = "nil" - } - if s.replicaSelector.region != nil { - if s.replicaSelector.region.isValid() { - cacheRegionIsValid = "true" - } else { - cacheRegionIsValid = "false" - } - } - for _, replica := range s.replicaSelector.replicas { - replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", - replica.peer.GetId(), - replica.store.storeID, - replica.isEpochStale(), - replica.attempts, - replica.epoch, - atomic.LoadUint32(&replica.store.epoch), - replica.store.getResolveState(), - replica.store.getLivenessState(), - )) - } - } var totalErrorStr bytes.Buffer for err, cnt := range totalErrors { if totalErrorStr.Len() > 0 { @@ -1484,12 +1506,10 @@ func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, r zap.Uint64("req-ts", req.GetStartTS()), zap.String("req-type", req.Type.String()), zap.String("region", regionID.String()), - zap.String("region-is-valid", cacheRegionIsValid), - zap.Int("retry-times", retryTimes), zap.String("replica-read-type", req.ReplicaReadType.String()), - zap.String("replica-selector-state", replicaSelectorState), zap.Bool("stale-read", req.StaleRead), - zap.String("replica-status", strings.Join(replicaStatus, "; ")), + zap.Stringer("request-sender", s), + zap.Int("retry-times", retryTimes), zap.Int("total-backoff-ms", bo.GetTotalSleep()), zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), zap.String("total-region-errors", totalErrorStr.String())) @@ -1840,6 +1860,20 @@ func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) { return } +// regionErrorToLogging constructs the logging content with extra information like returned leader peer id. +func regionErrorToLogging(peerID uint64, e *errorpb.Error) string { + str := regionErrorToLabel(e) + if e.GetNotLeader() != nil { + notLeader := e.GetNotLeader() + if notLeader.GetLeader() != nil { + str = fmt.Sprintf("%v-%v", str, notLeader.GetLeader().GetId()) + } else { + str = fmt.Sprintf("%v-nil", str) + } + } + return fmt.Sprintf("%v-%v", peerID, str) +} + func regionErrorToLabel(e *errorpb.Error) string { if e.GetNotLeader() != nil { return "not_leader" diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 28bc43e8..d1549b94 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1425,3 +1425,31 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg // after region error returned, the region should be invalidated. s.False(region.isValid()) } + +func (s *testRegionRequestToThreeStoresSuite) TestLogging() { + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + region, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(region) + + oc := s.regionRequestSender.client + defer func() { + s.regionRequestSender.client = oc + }() + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}, + }} + return response, nil + }} + + bo := retry.NewBackofferWithVars(context.Background(), 5, nil) + resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) + s.Nil(err) + s.NotNil(resp) + regionErr, _ := resp.GetRegionError() + s.NotNil(regionErr) +} From 295094e5b534f67446155c5125f9e57fb584d72e Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 29 Aug 2023 08:28:46 +0800 Subject: [PATCH 4/5] add retry info to request source (#953) * add retry info to request source Signed-off-by: you06 * handle upper layer retry Signed-off-by: you06 * stabilize test Signed-off-by: you06 * retry in 3 dimension Signed-off-by: you06 * record and restore req.ReadType Signed-off-by: you06 --------- Signed-off-by: you06 --- internal/client/client_test.go | 2 +- internal/locate/region_request.go | 64 ++++++++++++++++++-- internal/locate/region_request3_test.go | 77 +++++++++++++++++++++++++ tikvrpc/tikvrpc.go | 4 ++ txnkv/txnsnapshot/scan.go | 9 ++- txnkv/txnsnapshot/snapshot.go | 11 +++- 6 files changed, 158 insertions(+), 9 deletions(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 20cae1f0..e872643f 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -708,7 +708,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { cli.unlockForSend() break } - if time.Since(start) > time.Second*5 { + if time.Since(start) > time.Second*10 { // It shouldn't take too long for batch_client to reconnect. require.Fail(t, "wait batch client reconnect timeout") } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c22f782e..af0b748f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1378,7 +1378,6 @@ func (s *RegionRequestSender) SendReqCtx( totalErrors := make(map[string]int) for { if retryTimes > 0 { - req.IsRetryRequest = true if retryTimes%100 == 0 { logutil.Logger(bo.GetCtx()).Warn( "retry", @@ -1431,8 +1430,17 @@ func (s *RegionRequestSender) SendReqCtx( } } + if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { + return nil, nil, retryTimes, err + } + rpcCtx.contextPatcher.applyTo(&req.Context) + if req.InputRequestSource != "" && s.replicaSelector != nil { + s.replicaSelector.patchRequestSource(req, rpcCtx) + } + var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) + req.IsRetryRequest = true if err != nil { msg := fmt.Sprintf("send request failed, err: %v", err.Error()) s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors) @@ -1582,10 +1590,6 @@ func fetchRespInfo(resp *tikvrpc.Response) string { func (s *RegionRequestSender) sendReqToRegion( bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration, ) (resp *tikvrpc.Response, retry bool, err error) { - if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { - return nil, false, err - } - rpcCtx.contextPatcher.applyTo(&req.Context) // judge the store limit switch. if limit := kv.StoreLimit.Load(); limit > 0 { if err := s.getStoreToken(rpcCtx.Store, limit); err != nil { @@ -2302,3 +2306,53 @@ func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Res metrics.StaleReadRemoteInBytes.Add(float64(size)) } } + +func (s *replicaSelector) replicaType(rpcCtx *RPCContext) string { + leaderIdx := -1 + switch v := s.state.(type) { + case *accessKnownLeader: + return "leader" + case *tryFollower: + return "follower" + case *accessFollower: + leaderIdx = int(v.leaderIdx) + case *tryIdleReplica: + leaderIdx = int(v.leaderIdx) + } + if leaderIdx > -1 && rpcCtx != nil && rpcCtx.Peer != nil { + for idx, replica := range s.replicas { + if replica.peer.Id == rpcCtx.Peer.Id { + if idx == leaderIdx { + return "leader" + } + return "follower" + } + } + } + return "unknown" +} + +func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCContext) { + var sb strings.Builder + sb.WriteString(req.InputRequestSource) + sb.WriteByte('-') + defer func() { + req.RequestSource = sb.String() + }() + + replicaType := s.replicaType(rpcCtx) + + if req.IsRetryRequest { + sb.WriteString("retry_") + sb.WriteString(req.ReadType) + sb.WriteByte('_') + sb.WriteString(replicaType) + return + } + if req.StaleRead { + req.ReadType = "stale_" + replicaType + } else { + req.ReadType = replicaType + } + sb.WriteString(req.ReadType) +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index d1549b94..7a92c58f 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1453,3 +1453,80 @@ func (s *testRegionRequestToThreeStoresSuite) TestLogging() { regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } + +func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { + leaderStore, _ := s.loadAndGetLeaderStore() + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + req.InputRequestSource = "test" + + setReadType := func(req *tikvrpc.Request, readType string) { + req.StaleRead = false + req.ReplicaRead = false + switch readType { + case "leader": + return + case "follower": + req.ReplicaRead = true + req.ReplicaReadType = kv.ReplicaReadFollower + case "stale_follower", "stale_leader": + req.EnableStaleRead() + default: + panic("unreachable") + } + } + + setTargetReplica := func(selector *replicaSelector, readType string) { + var leader bool + switch readType { + case "leader", "stale_leader": + leader = true + case "follower", "stale_follower": + leader = false + default: + panic("unreachable") + } + for idx, replica := range selector.replicas { + if replica.store.storeID == leaderStore.storeID && leader { + selector.targetIdx = AccessIndex(idx) + return + } + if replica.store.storeID != leaderStore.storeID && !leader { + selector.targetIdx = AccessIndex(idx) + return + } + } + panic("unreachable") + } + + firstReadReplicas := []string{"leader", "follower", "stale_follower", "stale_leader"} + retryReadReplicas := []string{"leader", "follower"} + for _, firstReplica := range firstReadReplicas { + for _, retryReplica := range retryReadReplicas { + bo := retry.NewBackoffer(context.Background(), -1) + req.IsRetryRequest = false + setReadType(req, firstReplica) + replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req) + s.Nil(err) + setTargetReplica(replicaSelector, firstReplica) + rpcCtx, err := replicaSelector.buildRPCContext(bo) + s.Nil(err) + replicaSelector.patchRequestSource(req, rpcCtx) + s.Equal("test-"+firstReplica, req.RequestSource) + + // retry + setReadType(req, retryReplica) + replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req) + s.Nil(err) + setTargetReplica(replicaSelector, retryReplica) + rpcCtx, err = replicaSelector.buildRPCContext(bo) + s.Nil(err) + req.IsRetryRequest = true + replicaSelector.patchRequestSource(req, rpcCtx) + s.Equal("test-retry_"+firstReplica+"_"+retryReplica, req.RequestSource) + } + } +} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 4d81e148..76f4e614 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -234,6 +234,10 @@ type Request struct { ForwardedHost string // ReplicaNumber is the number of current replicas, which is used to calculate the RU cost. ReplicaNumber int64 + // The initial read type, note this will be assigned in the first try, no need to set it outside the client. + ReadType string + // InputRequestSource is the input source of the request, if it's not empty, the final RequestSource sent to store will be attached with the retry info. + InputRequestSource string } // NewRequest returns new kv rpc request. diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 01662f1d..59c8fca8 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -202,6 +202,8 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { var loc *locate.KeyLocation var resolvingRecordToken *int var err error + // the states in request need to keep when retry request. + var readType string for { if !s.reverse { loc, err = s.snapshot.store.GetRegionCache().LocateKey(bo, s.nextStartKey) @@ -245,12 +247,16 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { TaskId: s.snapshot.mu.taskID, ResourceGroupTag: s.snapshot.mu.resourceGroupTag, IsolationLevel: s.snapshot.isolationLevel.ToPB(), - RequestSource: s.snapshot.GetRequestSource(), ResourceControlContext: &kvrpcpb.ResourceControlContext{ ResourceGroupName: s.snapshot.mu.resourceGroupName, }, BusyThresholdMs: uint32(s.snapshot.mu.busyThreshold.Milliseconds()), }) + if readType != "" { + req.ReadType = readType + req.IsRetryRequest = true + } + req.InputRequestSource = s.snapshot.GetRequestSource() if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil { s.snapshot.mu.resourceGroupTagger(req) } @@ -263,6 +269,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { if err != nil { return err } + readType = req.ReadType if regionErr != nil { logutil.BgLogger().Debug("scanner getData failed", zap.Stringer("regionErr", regionErr)) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index ab296553..624dc690 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -389,6 +389,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, pending := batch.keys var resolvingRecordToken *int useConfigurableKVTimeout := true + // the states in request need to keep when retry request. + var readType string for { s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{ @@ -400,12 +402,16 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, TaskId: s.mu.taskID, ResourceGroupTag: s.mu.resourceGroupTag, IsolationLevel: s.isolationLevel.ToPB(), - RequestSource: s.GetRequestSource(), ResourceControlContext: &kvrpcpb.ResourceControlContext{ ResourceGroupName: s.mu.resourceGroupName, }, BusyThresholdMs: uint32(busyThresholdMs), }) + req.InputRequestSource = s.GetRequestSource() + if readType != "" { + req.ReadType = readType + req.IsRetryRequest = true + } if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) } @@ -443,6 +449,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if err != nil { return err } + readType = req.ReadType if regionErr != nil { // For other region error and the fake region error, backoff because // there's something wrong. @@ -626,12 +633,12 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] TaskId: s.mu.taskID, ResourceGroupTag: s.mu.resourceGroupTag, IsolationLevel: s.isolationLevel.ToPB(), - RequestSource: s.GetRequestSource(), ResourceControlContext: &kvrpcpb.ResourceControlContext{ ResourceGroupName: s.mu.resourceGroupName, }, BusyThresholdMs: uint32(s.mu.busyThreshold.Milliseconds()), }) + req.InputRequestSource = s.GetRequestSource() if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) } From a8860a98018e5532d6f3635c67188e236a6e717d Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 29 Aug 2023 13:43:23 +0800 Subject: [PATCH 5/5] gc: add resolve locks interface for tidb gc_worker (#945) * gc: add GCResolver inteface for resolve locks Signed-off-by: 3pointer * adapt scanlimit Signed-off-by: 3pointer * rename GCLockResolver to RegionLockResolver Signed-off-by: 3pointer * update Signed-off-by: 3pointer * address comments Signed-off-by: 3pointer --------- Signed-off-by: 3pointer --- tikv/gc.go | 114 +++++++++++++++++++++++++++++++++++---------- tikv/test_probe.go | 2 +- 2 files changed, 91 insertions(+), 25 deletions(-) diff --git a/tikv/gc.go b/tikv/gc.go index 2b47e6bc..0ce3bbc5 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -35,6 +35,9 @@ import ( zap "go.uber.org/zap" ) +// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. +const GCScanLockLimit = txnlock.ResolvedCacheSize / 2 + // GC does garbage collection (GC) of the TiKV cluster. // GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee // @@ -81,8 +84,9 @@ func WithConcurrency(concurrency int) GCOpt { } func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error { + lockResolver := NewRegionLockResolver("gc-client-go-api", s) handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { - return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey) + return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit) } runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler) @@ -94,72 +98,131 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc return nil } -// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. -const gcScanLockLimit = txnlock.ResolvedCacheSize / 2 +type BaseRegionLockResolver struct { + identifier string + store Storage +} -func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) { +func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver { + return &BaseRegionLockResolver{ + identifier: identifier, + store: store, + } +} + +func (l *BaseRegionLockResolver) Identifier() string { + return l.identifier +} + +func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) { + return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc) +} + +func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { + return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) +} + +func (l *BaseRegionLockResolver) GetStore() Storage { + return l.store +} + +// RegionLockResolver is used for GCWorker and log backup advancer to resolve locks in a region. +type RegionLockResolver interface { + // Identifier represents the name of this resolver. + Identifier() string + + // ResolveLocksInOneRegion tries to resolve expired locks for one region. + // 1. For GCWorker it will scan locks before *safepoint*, + // and force remove these locks. rollback the txn, no matter the lock is expired of not. + // 2. For log backup advancer, it will scan all locks for a small range. + // and it will check status of the txn. resolve the locks if txn is expired, Or do nothing. + // + // regionLocation should return if resolve locks succeed. if regionLocation return nil, + // which means not all locks are resolved in someway. the caller should retry scan locks. + // ** the locks are assumed sorted by key in ascending order ** + ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error) + + // ScanLocksInOneRegion return locks and location with given start key in a region. + // The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation. + // which will used by ResolveLocksInOneRegion later. + ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) + + // GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. + GetStore() Storage +} + +func ResolveLocksForRange( + ctx context.Context, + resolver RegionLockResolver, + maxVersion uint64, + startKey []byte, + endKey []byte, + createBackoffFn func(context.Context) *Backoffer, + scanLimit uint32, +) (rangetask.TaskStat, error) { // for scan lock request, we must return all locks even if they are generated // by the same transaction. because gc worker need to make sure all locks have been // cleaned. - var stat rangetask.TaskStat key := startKey - bo := NewGcResolveLockMaxBackoffer(ctx) + // create new backoffer for every scan and resolve locks + bo := createBackoffFn(ctx) for { select { case <-ctx.Done(): return stat, errors.New("[gc worker] gc job canceled") default: } - - locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit) + locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit) if err != nil { return stat, err } - resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc) - if err1 != nil { - return stat, err1 + resolvedLocation, err := resolver.ResolveLocksInOneRegion(bo, locks, loc) + if err != nil { + return stat, err } // resolve locks failed since the locks are not in one region anymore, need retry. if resolvedLocation == nil { continue } - if len(locks) < gcScanLockLimit { + if len(locks) < int(scanLimit) { stat.CompletedRegions++ key = loc.EndKey - logutil.Logger(ctx).Info("[gc worker] one region finshed ", + logutil.Logger(ctx).Debug("resolve one region finshed ", + zap.String("identifier", resolver.Identifier()), zap.Int("regionID", int(resolvedLocation.Region.GetID())), zap.Int("resolvedLocksNum", len(locks))) } else { - logutil.Logger(ctx).Info("[gc worker] region has more than limit locks", + logutil.Logger(ctx).Info("region has more than limit locks", + zap.String("identifier", resolver.Identifier()), zap.Int("regionID", int(resolvedLocation.Region.GetID())), zap.Int("resolvedLocksNum", len(locks)), - zap.Int("scan lock limit", gcScanLockLimit)) + zap.Uint32("scan lock limit", scanLimit)) key = locks[len(locks)-1].Key } if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) { break } - bo = NewGcResolveLockMaxBackoffer(ctx) + bo = createBackoffFn(ctx) } return stat, nil } -func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { +func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { for { - loc, err := s.GetRegionCache().LocateKey(bo, startKey) + loc, err := store.GetRegionCache().LocateKey(bo, startKey) if err != nil { return nil, loc, err } req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ MaxVersion: maxVersion, - Limit: gcScanLockLimit, + Limit: limit, StartKey: startKey, EndKey: loc.EndKey, }) - resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium) + resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { return nil, loc, err } @@ -190,15 +253,18 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey [] } } -// batchResolveLocksInARegion resolves locks in a region. +// batchResolveLocksInOneRegion resolves locks in a region. // It returns the real location of the resolved locks if resolve locks success. // It returns error when meet an unretryable error. // When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err. // Used it in gcworker only! -func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) { +func batchResolveLocksInOneRegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) { + if expectedLoc == nil { + return nil, nil + } resolvedLocation = expectedLoc for { - ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region) + ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region) if ok { return resolvedLocation, nil } @@ -209,7 +275,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc if err != nil { return nil, err } - region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key) + region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key) if err1 != nil { return nil, err1 } diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 5971480f..234d2585 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -123,7 +123,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV outerLoop: for { - locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit) + locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit) if err != nil { return nil, err }