mirror of https://github.com/tikv/client-go.git
don't retry same replica twice in a round, but for stale read, we can retry DataIsNotReady replica by replica-read (#1181)
* don't retry same replica twice in a round, but for stale read, we can retry DataIsNotReady replica by replica-read Signed-off-by: crazycs520 <crazycs520@gmail.com> * remove duplicate test case Signed-off-by: crazycs520 <crazycs520@gmail.com> * remove old duplicate test Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
e72c4cd474
commit
190f0cce53
|
|
@ -256,6 +256,7 @@ type replica struct {
|
|||
attemptedTime time.Duration
|
||||
// deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error.
|
||||
deadlineErrUsingConfTimeout bool
|
||||
dataIsNotReady bool
|
||||
}
|
||||
|
||||
func (r *replica) getEpoch() uint32 {
|
||||
|
|
@ -499,7 +500,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
|
|||
|
||||
if len(state.labels) > 0 {
|
||||
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
|
||||
return selectReplica.store.IsLabelsMatch(state.labels)
|
||||
return selectReplica.store.IsLabelsMatch(state.labels) && !state.isExhausted(selectReplica)
|
||||
})
|
||||
if selectReplica != nil && idx >= 0 {
|
||||
state.lastIdx = idx
|
||||
|
|
@ -512,7 +513,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
|
|||
if selector.targetIdx < 0 {
|
||||
// Search replica that is not attempted from the last accessed replica
|
||||
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
|
||||
return !selectReplica.isExhausted(1, 0)
|
||||
return !state.isExhausted(selectReplica)
|
||||
})
|
||||
if selectReplica != nil && idx >= 0 {
|
||||
state.lastIdx = idx
|
||||
|
|
@ -543,6 +544,14 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
|
|||
return rpcCtx, nil
|
||||
}
|
||||
|
||||
func (state *tryFollower) isExhausted(replica *replica) bool {
|
||||
if replica.dataIsNotReady {
|
||||
// we can retry DataIsNotReady replica by replica-read.
|
||||
return replica.isExhausted(2, 0)
|
||||
}
|
||||
return replica.isExhausted(1, 0)
|
||||
}
|
||||
|
||||
func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
|
||||
if state.fromAccessKnownLeader {
|
||||
peer := selector.targetReplica().peer
|
||||
|
|
@ -1295,6 +1304,12 @@ func (s *replicaSelector) canFallback2Follower() bool {
|
|||
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
|
||||
}
|
||||
|
||||
func (s *replicaSelector) onDataIsNotReady() {
|
||||
if target := s.targetReplica(); target != nil {
|
||||
target.dataIsNotReady = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelector) invalidateRegion() {
|
||||
if s.region != nil {
|
||||
s.region.invalidate(Other)
|
||||
|
|
@ -2277,6 +2292,9 @@ func (s *RegionRequestSender) onRegionError(
|
|||
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
|
||||
zap.Stringer("ctx", ctx),
|
||||
)
|
||||
if s.replicaSelector != nil {
|
||||
s.replicaSelector.onDataIsNotReady()
|
||||
}
|
||||
if !req.IsGlobalStaleRead() {
|
||||
// only backoff local stale reads as global should retry immediately against the leader as a normal read
|
||||
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
|
||||
|
|
|
|||
|
|
@ -1273,98 +1273,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
|
|||
s.True(backoffTimes["tikvRPC"] > 0) // write request timeout won't do fast retry, so backoff times should be more than 0.
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
|
||||
leaderStore, _ := s.loadAndGetLeaderStore()
|
||||
leaderLabel := []*metapb.StoreLabel{
|
||||
{
|
||||
Key: "id",
|
||||
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
|
||||
},
|
||||
}
|
||||
var followerID *uint64
|
||||
for _, storeID := range s.storeIDs {
|
||||
if storeID != leaderStore.storeID {
|
||||
id := storeID
|
||||
followerID = &id
|
||||
break
|
||||
}
|
||||
}
|
||||
s.NotNil(followerID)
|
||||
followerLabel := []*metapb.StoreLabel{
|
||||
{
|
||||
Key: "id",
|
||||
Value: strconv.FormatUint(*followerID, 10),
|
||||
},
|
||||
}
|
||||
|
||||
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
s.NotNil(regionLoc)
|
||||
|
||||
dataIsNotReady := false
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("timeout")
|
||||
default:
|
||||
}
|
||||
if dataIsNotReady && req.StaleRead {
|
||||
dataIsNotReady = false
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
DataIsNotReady: &errorpb.DataIsNotReady{},
|
||||
}}}, nil
|
||||
}
|
||||
if addr == leaderStore.addr {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
ServerIsBusy: &errorpb.ServerIsBusy{},
|
||||
}}}, nil
|
||||
}
|
||||
if !req.ReplicaRead {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
NotLeader: &errorpb.NotLeader{},
|
||||
}}}, nil
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
|
||||
}}
|
||||
|
||||
for _, localLeader := range []bool{true, false} {
|
||||
dataIsNotReady = true
|
||||
// data is not ready, then server is busy in the first round,
|
||||
// directly server is busy in the second round.
|
||||
for i := 0; i < 2; i++ {
|
||||
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
|
||||
req.ReadReplicaScope = oracle.GlobalTxnScope
|
||||
req.TxnScope = oracle.GlobalTxnScope
|
||||
req.EnableStaleWithMixedReplicaRead()
|
||||
req.ReplicaReadType = kv.ReplicaReadMixed
|
||||
var ops []StoreSelectorOption
|
||||
if localLeader {
|
||||
ops = append(ops, WithMatchLabels(leaderLabel))
|
||||
} else {
|
||||
ops = append(ops, WithMatchLabels(followerLabel))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
|
||||
bo := retry.NewBackoffer(ctx, -1)
|
||||
s.Nil(err)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
|
||||
s.Nil(err)
|
||||
|
||||
regionErr, err := resp.GetRegionError()
|
||||
s.Nil(err)
|
||||
s.Nil(regionErr)
|
||||
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
|
||||
s.True(ok)
|
||||
if localLeader {
|
||||
s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value))
|
||||
} else {
|
||||
s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value))
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderRegionError() {
|
||||
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
|
|
|
|||
|
|
@ -282,7 +282,87 @@ func TestReplicaReadStaleReadAccessPathByCase(t *testing.T) {
|
|||
},
|
||||
}
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
|
||||
ca = replicaSelectorAccessPathCase{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
readType: kv.ReplicaReadMixed,
|
||||
staleRead: true,
|
||||
timeout: 0,
|
||||
label: &metapb.StoreLabel{Key: "id", Value: "3"},
|
||||
accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyErr},
|
||||
expect: &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store3, replica-read: false, stale-read: true}",
|
||||
"{addr: store1, replica-read: false, stale-read: false}",
|
||||
"{addr: store2, replica-read: true, stale-read: false}"},
|
||||
respErr: "",
|
||||
respRegionError: nil,
|
||||
backoffCnt: 1,
|
||||
backoffDetail: []string{"tikvServerBusy+1"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
}
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
|
||||
ca = replicaSelectorAccessPathCase{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
readType: kv.ReplicaReadMixed,
|
||||
staleRead: true,
|
||||
label: &metapb.StoreLabel{Key: "id", Value: "2"},
|
||||
accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr, ServerIsBusyErr, ServerIsBusyErr},
|
||||
expect: &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store2, replica-read: false, stale-read: true}",
|
||||
"{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||
"{addr: store2, replica-read: true, stale-read: false}",
|
||||
"{addr: store3, replica-read: true, stale-read: false}",
|
||||
},
|
||||
respErr: "",
|
||||
respRegionError: fakeEpochNotMatch,
|
||||
backoffCnt: 2,
|
||||
backoffDetail: []string{"tikvServerBusy+2"},
|
||||
regionIsValid: false,
|
||||
},
|
||||
}
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
|
||||
s.changeRegionLeader(2)
|
||||
ca = replicaSelectorAccessPathCase{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
readType: kv.ReplicaReadMixed,
|
||||
staleRead: true,
|
||||
accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr, ServerIsBusyErr, ServerIsBusyErr},
|
||||
expect: &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store1, replica-read: false, stale-read: true}",
|
||||
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||
"{addr: store3, replica-read: true, stale-read: false}",
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
},
|
||||
respErr: "",
|
||||
respRegionError: fakeEpochNotMatch,
|
||||
backoffCnt: 2,
|
||||
backoffDetail: []string{"tikvServerBusy+2"},
|
||||
regionIsValid: false,
|
||||
},
|
||||
}
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
s.changeRegionLeader(1)
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) {
|
||||
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
|
||||
s.Nil(err)
|
||||
rc := s.cache.GetCachedRegionWithRLock(loc.Region)
|
||||
for _, peer := range rc.meta.Peers {
|
||||
if peer.StoreId == storeId {
|
||||
s.cluster.ChangeLeader(rc.meta.Id, peer.Id)
|
||||
}
|
||||
}
|
||||
// Invalidate region cache to reload.
|
||||
s.cache.InvalidateCachedRegion(loc.Region)
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) runCaseAndCompare(ca2 replicaSelectorAccessPathCase) bool {
|
||||
ca2.run(s)
|
||||
if ca2.accessErrInValid {
|
||||
|
|
|
|||
Loading…
Reference in New Issue