mirror of https://github.com/tikv/client-go.git
*: configurable kv_read_timeout should not affect write request (#978)
* *: configurable kv_read_timeout should not affect write request Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine test Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine logic Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
342301689f
commit
39084386b3
|
|
@ -1085,7 +1085,14 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
|
|||
s.state.onSendFailure(bo, s, err)
|
||||
}
|
||||
|
||||
func (s *replicaSelector) onDeadlineExceeded() {
|
||||
func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
|
||||
if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) {
|
||||
// Configurable timeout should less than `ReadTimeoutShort`.
|
||||
return false
|
||||
}
|
||||
switch req.Type {
|
||||
case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan,
|
||||
tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream:
|
||||
if target := s.targetReplica(); target != nil {
|
||||
target.deadlineErrUsingConfTimeout = true
|
||||
}
|
||||
|
|
@ -1093,6 +1100,11 @@ func (s *replicaSelector) onDeadlineExceeded() {
|
|||
// If leader return deadline exceeded error, we should try to access follower next time.
|
||||
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
|
||||
}
|
||||
return true
|
||||
default:
|
||||
// Only work for read requests, return false for non-read requests.
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
|
||||
|
|
@ -1776,9 +1788,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
|
|||
return errors.WithStack(err)
|
||||
} else if LoadShuttingDown() > 0 {
|
||||
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
|
||||
} else if isCauseByDeadlineExceeded(err) && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
|
||||
if s.replicaSelector != nil {
|
||||
s.replicaSelector.onDeadlineExceeded()
|
||||
} else if isCauseByDeadlineExceeded(err) {
|
||||
if s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
@ -2097,9 +2108,10 @@ func (s *RegionRequestSender) onRegionError(
|
|||
|
||||
if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
|
||||
if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") {
|
||||
s.replicaSelector.onDeadlineExceeded()
|
||||
if s.replicaSelector.onReadReqConfigurableTimeout(req) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
if s.replicaSelector != nil {
|
||||
return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy)
|
||||
}
|
||||
|
|
@ -2228,8 +2240,8 @@ func (s *RegionRequestSender) onRegionError(
|
|||
return true, nil
|
||||
}
|
||||
|
||||
if isDeadlineExceeded(regionErr) && s.replicaSelector != nil {
|
||||
s.replicaSelector.onDeadlineExceeded()
|
||||
if isDeadlineExceeded(regionErr) && s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if mismatch := regionErr.GetMismatchPeerId(); mismatch != nil {
|
||||
|
|
|
|||
|
|
@ -1278,6 +1278,22 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
|
|||
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
|
||||
}
|
||||
}
|
||||
|
||||
// Test for write request.
|
||||
tf := func(s *Store, bo *retry.Backoffer) livenessState {
|
||||
return reachable
|
||||
}
|
||||
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
|
||||
resetStats()
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{})
|
||||
req.ReplicaReadType = kv.ReplicaReadLeader
|
||||
loc := getLocFn()
|
||||
bo = retry.NewBackoffer(context.Background(), 1000)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV)
|
||||
s.Nil(resp)
|
||||
s.Equal(context.DeadlineExceeded, err)
|
||||
backoffTimes := bo.GetBackoffTimes()
|
||||
s.True(backoffTimes["tikvRPC"] > 0) // write request timeout won't do fast retry, so backoff times should be more than 0.
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue