mirror of https://github.com/tikv/client-go.git
retry as stale read on replica (#1509)
Signed-off-by: rishabh_mittal <mittalrishabh@gmail.com>
This commit is contained in:
parent
89643b0e8c
commit
c65273eeb1
|
|
@ -367,7 +367,7 @@ func TestRegionCacheStaleRead(t *testing.T) {
|
||||||
leaderRegionValid: true,
|
leaderRegionValid: true,
|
||||||
leaderAsyncReload: util.Some(false),
|
leaderAsyncReload: util.Some(false),
|
||||||
leaderSuccessReplica: []string{"z2", "z3"},
|
leaderSuccessReplica: []string{"z2", "z3"},
|
||||||
leaderSuccessReadType: SuccessFollowerRead,
|
leaderSuccessReadType: SuccessStaleRead,
|
||||||
followerRegionValid: true,
|
followerRegionValid: true,
|
||||||
followerAsyncReload: util.Some(false),
|
followerAsyncReload: util.Some(false),
|
||||||
followerSuccessReplica: []string{"z2"},
|
followerSuccessReplica: []string{"z2"},
|
||||||
|
|
@ -392,11 +392,11 @@ func TestRegionCacheStaleRead(t *testing.T) {
|
||||||
leaderRegionValid: true,
|
leaderRegionValid: true,
|
||||||
leaderAsyncReload: util.Some(false),
|
leaderAsyncReload: util.Some(false),
|
||||||
leaderSuccessReplica: []string{"z3"},
|
leaderSuccessReplica: []string{"z3"},
|
||||||
leaderSuccessReadType: SuccessFollowerRead,
|
leaderSuccessReadType: SuccessStaleRead,
|
||||||
followerRegionValid: true,
|
followerRegionValid: true,
|
||||||
followerAsyncReload: util.Some(false),
|
followerAsyncReload: util.Some(false),
|
||||||
followerSuccessReplica: []string{"z3"},
|
followerSuccessReplica: []string{"z3"},
|
||||||
followerSuccessReadType: SuccessFollowerRead,
|
followerSuccessReadType: SuccessStaleRead,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
do: leaderServerIsBusy,
|
do: leaderServerIsBusy,
|
||||||
|
|
@ -405,11 +405,11 @@ func TestRegionCacheStaleRead(t *testing.T) {
|
||||||
leaderRegionValid: true,
|
leaderRegionValid: true,
|
||||||
leaderAsyncReload: util.Some(false),
|
leaderAsyncReload: util.Some(false),
|
||||||
leaderSuccessReplica: []string{"z2", "z3"},
|
leaderSuccessReplica: []string{"z2", "z3"},
|
||||||
leaderSuccessReadType: SuccessFollowerRead,
|
leaderSuccessReadType: SuccessStaleRead,
|
||||||
followerRegionValid: true,
|
followerRegionValid: true,
|
||||||
followerAsyncReload: util.Some(false),
|
followerAsyncReload: util.Some(false),
|
||||||
followerSuccessReplica: []string{"z2", "z3"},
|
followerSuccessReplica: []string{"z2", "z3"},
|
||||||
followerSuccessReadType: SuccessFollowerRead,
|
followerSuccessReadType: SuccessStaleRead,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
do: leaderServerIsBusy,
|
do: leaderServerIsBusy,
|
||||||
|
|
@ -418,11 +418,11 @@ func TestRegionCacheStaleRead(t *testing.T) {
|
||||||
leaderRegionValid: true,
|
leaderRegionValid: true,
|
||||||
leaderAsyncReload: util.Some(false),
|
leaderAsyncReload: util.Some(false),
|
||||||
leaderSuccessReplica: []string{"z3"},
|
leaderSuccessReplica: []string{"z3"},
|
||||||
leaderSuccessReadType: SuccessFollowerRead,
|
leaderSuccessReadType: SuccessStaleRead,
|
||||||
followerRegionValid: true,
|
followerRegionValid: true,
|
||||||
followerAsyncReload: util.Some(false),
|
followerAsyncReload: util.Some(false),
|
||||||
followerSuccessReplica: []string{"z3"},
|
followerSuccessReplica: []string{"z3"},
|
||||||
followerSuccessReadType: SuccessFollowerRead,
|
followerSuccessReadType: SuccessStaleRead,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
do: leaderDown,
|
do: leaderDown,
|
||||||
|
|
|
||||||
|
|
@ -182,17 +182,21 @@ func (s *replicaSelector) nextForReplicaReadMixed(req *tikvrpc.Request) {
|
||||||
}
|
}
|
||||||
s.target = strategy.next(s)
|
s.target = strategy.next(s)
|
||||||
if s.target != nil {
|
if s.target != nil {
|
||||||
if s.isStaleRead && s.attempts == 1 {
|
if s.isStaleRead {
|
||||||
// stale-read request first access.
|
isStaleRead := true
|
||||||
if !s.target.store.IsLabelsMatch(s.option.labels) && s.target.peer.Id != s.region.GetLeaderPeerID() {
|
if s.attempts != 1 || (!s.target.store.IsLabelsMatch(s.option.labels) && s.target.peer.Id != s.region.GetLeaderPeerID()) {
|
||||||
// If the target replica's labels is not match and not leader, use replica read.
|
// retry or target replica's labels does not match and not leader
|
||||||
// This is for compatible with old version.
|
if strategy.canSendReplicaRead(s) {
|
||||||
req.StaleRead = false
|
// use replica read.
|
||||||
req.ReplicaRead = true
|
isStaleRead = false
|
||||||
} else {
|
}
|
||||||
// use stale read.
|
}
|
||||||
|
if isStaleRead {
|
||||||
req.StaleRead = true
|
req.StaleRead = true
|
||||||
req.ReplicaRead = false
|
req.ReplicaRead = false
|
||||||
|
} else {
|
||||||
|
req.StaleRead = false
|
||||||
|
req.ReplicaRead = true
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// always use replica.
|
// always use replica.
|
||||||
|
|
@ -300,6 +304,16 @@ func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelector) *replica {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ReplicaSelectMixedStrategy) canSendReplicaRead(selector *replicaSelector) bool {
|
||||||
|
replicas := selector.replicas
|
||||||
|
replica := replicas[s.leaderIdx]
|
||||||
|
if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) || replica.hasFlag(serverIsBusyFlag) {
|
||||||
|
// don't overwhelm the leader if it is busy
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func hasDeadlineExceededError(replicas []*replica) bool {
|
func hasDeadlineExceededError(replicas []*replica) bool {
|
||||||
for _, replica := range replicas {
|
for _, replica := range replicas {
|
||||||
if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) {
|
if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) {
|
||||||
|
|
@ -529,6 +543,9 @@ func (s *replicaSelector) onServerIsBusy(
|
||||||
backoffErr := errors.Errorf("server is busy, ctx: %v", ctx)
|
backoffErr := errors.Errorf("server is busy, ctx: %v", ctx)
|
||||||
if s.canFastRetry() {
|
if s.canFastRetry() {
|
||||||
s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr)
|
s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr)
|
||||||
|
if s.target != nil {
|
||||||
|
s.target.addFlag(serverIsBusyFlag)
|
||||||
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr)
|
err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr)
|
||||||
|
|
|
||||||
|
|
@ -362,7 +362,7 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: false}",
|
||||||
"{addr: store2, replica-read: true, stale-read: false}"},
|
"{addr: store2, replica-read: false, stale-read: true}"},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
backoffCnt: 0,
|
backoffCnt: 0,
|
||||||
|
|
@ -404,7 +404,7 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: false}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}"}, // store2 has DeadLineExceededErr, so don't retry store2 even it is new leader.
|
"{addr: store3, replica-read: false, stale-read: true}"}, // store2 has DeadLineExceededErr, so don't retry store2 even it is new leader.
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
backoffCnt: 0,
|
backoffCnt: 0,
|
||||||
|
|
@ -504,8 +504,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
expect: &accessPathResult{
|
expect: &accessPathResult{
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
"{addr: store2, replica-read: true, stale-read: false}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}"},
|
"{addr: store3, replica-read: false, stale-read: true}"},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: fakeEpochNotMatch,
|
respRegionError: fakeEpochNotMatch,
|
||||||
backoffCnt: 1,
|
backoffCnt: 1,
|
||||||
|
|
@ -548,7 +548,7 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store3, replica-read: false, stale-read: true}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: false}",
|
||||||
"{addr: store2, replica-read: true, stale-read: false}"},
|
"{addr: store2, replica-read: false, stale-read: true}"},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
backoffCnt: 0,
|
backoffCnt: 0,
|
||||||
|
|
@ -568,8 +568,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read.
|
"{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||||
"{addr: store2, replica-read: true, stale-read: false}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: fakeEpochNotMatch,
|
respRegionError: fakeEpochNotMatch,
|
||||||
|
|
@ -590,8 +590,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: true, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
|
|
@ -611,8 +611,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: true, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: fakeEpochNotMatch,
|
respRegionError: fakeEpochNotMatch,
|
||||||
|
|
@ -654,8 +654,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
|
||||||
expect: &accessPathResult{
|
expect: &accessPathResult{
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
"{addr: store2, replica-read: true, stale-read: false}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
|
|
@ -920,7 +920,7 @@ func TestReplicaReadAccessPathByCase2(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: false}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}"},
|
"{addr: store3, replica-read: false, stale-read: true}"},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
backoffCnt: 0,
|
backoffCnt: 0,
|
||||||
|
|
@ -1052,9 +1052,16 @@ func TestReplicaReadAccessPathByBasicCase(t *testing.T) {
|
||||||
backoff = []string{}
|
backoff = []string{}
|
||||||
}
|
}
|
||||||
if staleRead {
|
if staleRead {
|
||||||
accessPath = []string{
|
if tp == ServerIsBusyErr || tp == ServerIsBusyWithEstimatedWaitMsErr {
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
accessPath = []string{
|
||||||
"{addr: store2, replica-read: true, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
accessPath = []string{
|
||||||
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
|
"{addr: store2, replica-read: true, stale-read: false}",
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
|
@ -1918,8 +1925,8 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read.
|
"{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||||
"{addr: store2, replica-read: true, stale-read: false}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: fakeEpochNotMatch,
|
respRegionError: fakeEpochNotMatch,
|
||||||
|
|
@ -1940,8 +1947,8 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
"{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read.
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: true, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: fakeEpochNotMatch,
|
respRegionError: fakeEpochNotMatch,
|
||||||
|
|
@ -2050,8 +2057,8 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
||||||
expect: &accessPathResult{
|
expect: &accessPathResult{
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store1, replica-read: false, stale-read: true}",
|
"{addr: store1, replica-read: false, stale-read: true}",
|
||||||
"{addr: store2, replica-read: true, stale-read: false}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
|
|
@ -2073,7 +2080,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: false}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
|
|
@ -2095,7 +2102,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
||||||
accessPath: []string{
|
accessPath: []string{
|
||||||
"{addr: store2, replica-read: false, stale-read: true}",
|
"{addr: store2, replica-read: false, stale-read: true}",
|
||||||
"{addr: store1, replica-read: false, stale-read: false}",
|
"{addr: store1, replica-read: false, stale-read: false}",
|
||||||
"{addr: store3, replica-read: true, stale-read: false}",
|
"{addr: store3, replica-read: false, stale-read: true}",
|
||||||
},
|
},
|
||||||
respErr: "",
|
respErr: "",
|
||||||
respRegionError: nil,
|
respRegionError: nil,
|
||||||
|
|
@ -2666,7 +2673,11 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) {
|
||||||
if expectedFirstStore == 3 {
|
if expectedFirstStore == 3 {
|
||||||
// Retry on store 2 which is a follower.
|
// Retry on store 2 which is a follower.
|
||||||
// Stale-read mode falls back to replica-read mode.
|
// Stale-read mode falls back to replica-read mode.
|
||||||
expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}"
|
if staleRead {
|
||||||
|
expectedSecondPath = "{addr: store2, replica-read: false, stale-read: true}"
|
||||||
|
} else {
|
||||||
|
expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}"
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if staleRead {
|
if staleRead {
|
||||||
// Retry in leader read mode
|
// Retry in leader read mode
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue