mirror of https://github.com/tikv/client-go.git
add retry info to request source (#953)
* add retry info to request source Signed-off-by: you06 <you1474600@gmail.com> * handle upper layer retry Signed-off-by: you06 <you1474600@gmail.com> * stabilize test Signed-off-by: you06 <you1474600@gmail.com> * retry in 3 dimension Signed-off-by: you06 <you1474600@gmail.com> * record and restore req.ReadType Signed-off-by: you06 <you1474600@gmail.com> --------- Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
fc88757771
commit
295094e5b5
|
|
@ -708,7 +708,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
|
|||
cli.unlockForSend()
|
||||
break
|
||||
}
|
||||
if time.Since(start) > time.Second*5 {
|
||||
if time.Since(start) > time.Second*10 {
|
||||
// It shouldn't take too long for batch_client to reconnect.
|
||||
require.Fail(t, "wait batch client reconnect timeout")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1378,7 +1378,6 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
totalErrors := make(map[string]int)
|
||||
for {
|
||||
if retryTimes > 0 {
|
||||
req.IsRetryRequest = true
|
||||
if retryTimes%100 == 0 {
|
||||
logutil.Logger(bo.GetCtx()).Warn(
|
||||
"retry",
|
||||
|
|
@ -1431,8 +1430,17 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
}
|
||||
}
|
||||
|
||||
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
|
||||
return nil, nil, retryTimes, err
|
||||
}
|
||||
rpcCtx.contextPatcher.applyTo(&req.Context)
|
||||
if req.InputRequestSource != "" && s.replicaSelector != nil {
|
||||
s.replicaSelector.patchRequestSource(req, rpcCtx)
|
||||
}
|
||||
|
||||
var retry bool
|
||||
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
|
||||
req.IsRetryRequest = true
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
|
||||
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
|
||||
|
|
@ -1582,10 +1590,6 @@ func fetchRespInfo(resp *tikvrpc.Response) string {
|
|||
func (s *RegionRequestSender) sendReqToRegion(
|
||||
bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration,
|
||||
) (resp *tikvrpc.Response, retry bool, err error) {
|
||||
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
rpcCtx.contextPatcher.applyTo(&req.Context)
|
||||
// judge the store limit switch.
|
||||
if limit := kv.StoreLimit.Load(); limit > 0 {
|
||||
if err := s.getStoreToken(rpcCtx.Store, limit); err != nil {
|
||||
|
|
@ -2302,3 +2306,53 @@ func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Res
|
|||
metrics.StaleReadRemoteInBytes.Add(float64(size))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelector) replicaType(rpcCtx *RPCContext) string {
|
||||
leaderIdx := -1
|
||||
switch v := s.state.(type) {
|
||||
case *accessKnownLeader:
|
||||
return "leader"
|
||||
case *tryFollower:
|
||||
return "follower"
|
||||
case *accessFollower:
|
||||
leaderIdx = int(v.leaderIdx)
|
||||
case *tryIdleReplica:
|
||||
leaderIdx = int(v.leaderIdx)
|
||||
}
|
||||
if leaderIdx > -1 && rpcCtx != nil && rpcCtx.Peer != nil {
|
||||
for idx, replica := range s.replicas {
|
||||
if replica.peer.Id == rpcCtx.Peer.Id {
|
||||
if idx == leaderIdx {
|
||||
return "leader"
|
||||
}
|
||||
return "follower"
|
||||
}
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCContext) {
|
||||
var sb strings.Builder
|
||||
sb.WriteString(req.InputRequestSource)
|
||||
sb.WriteByte('-')
|
||||
defer func() {
|
||||
req.RequestSource = sb.String()
|
||||
}()
|
||||
|
||||
replicaType := s.replicaType(rpcCtx)
|
||||
|
||||
if req.IsRetryRequest {
|
||||
sb.WriteString("retry_")
|
||||
sb.WriteString(req.ReadType)
|
||||
sb.WriteByte('_')
|
||||
sb.WriteString(replicaType)
|
||||
return
|
||||
}
|
||||
if req.StaleRead {
|
||||
req.ReadType = "stale_" + replicaType
|
||||
} else {
|
||||
req.ReadType = replicaType
|
||||
}
|
||||
sb.WriteString(req.ReadType)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1453,3 +1453,80 @@ func (s *testRegionRequestToThreeStoresSuite) TestLogging() {
|
|||
regionErr, _ := resp.GetRegionError()
|
||||
s.NotNil(regionErr)
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
|
||||
leaderStore, _ := s.loadAndGetLeaderStore()
|
||||
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
||||
Key: []byte("key"),
|
||||
})
|
||||
req.InputRequestSource = "test"
|
||||
|
||||
setReadType := func(req *tikvrpc.Request, readType string) {
|
||||
req.StaleRead = false
|
||||
req.ReplicaRead = false
|
||||
switch readType {
|
||||
case "leader":
|
||||
return
|
||||
case "follower":
|
||||
req.ReplicaRead = true
|
||||
req.ReplicaReadType = kv.ReplicaReadFollower
|
||||
case "stale_follower", "stale_leader":
|
||||
req.EnableStaleRead()
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
setTargetReplica := func(selector *replicaSelector, readType string) {
|
||||
var leader bool
|
||||
switch readType {
|
||||
case "leader", "stale_leader":
|
||||
leader = true
|
||||
case "follower", "stale_follower":
|
||||
leader = false
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
for idx, replica := range selector.replicas {
|
||||
if replica.store.storeID == leaderStore.storeID && leader {
|
||||
selector.targetIdx = AccessIndex(idx)
|
||||
return
|
||||
}
|
||||
if replica.store.storeID != leaderStore.storeID && !leader {
|
||||
selector.targetIdx = AccessIndex(idx)
|
||||
return
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
firstReadReplicas := []string{"leader", "follower", "stale_follower", "stale_leader"}
|
||||
retryReadReplicas := []string{"leader", "follower"}
|
||||
for _, firstReplica := range firstReadReplicas {
|
||||
for _, retryReplica := range retryReadReplicas {
|
||||
bo := retry.NewBackoffer(context.Background(), -1)
|
||||
req.IsRetryRequest = false
|
||||
setReadType(req, firstReplica)
|
||||
replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
setTargetReplica(replicaSelector, firstReplica)
|
||||
rpcCtx, err := replicaSelector.buildRPCContext(bo)
|
||||
s.Nil(err)
|
||||
replicaSelector.patchRequestSource(req, rpcCtx)
|
||||
s.Equal("test-"+firstReplica, req.RequestSource)
|
||||
|
||||
// retry
|
||||
setReadType(req, retryReplica)
|
||||
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
setTargetReplica(replicaSelector, retryReplica)
|
||||
rpcCtx, err = replicaSelector.buildRPCContext(bo)
|
||||
s.Nil(err)
|
||||
req.IsRetryRequest = true
|
||||
replicaSelector.patchRequestSource(req, rpcCtx)
|
||||
s.Equal("test-retry_"+firstReplica+"_"+retryReplica, req.RequestSource)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -234,6 +234,10 @@ type Request struct {
|
|||
ForwardedHost string
|
||||
// ReplicaNumber is the number of current replicas, which is used to calculate the RU cost.
|
||||
ReplicaNumber int64
|
||||
// The initial read type, note this will be assigned in the first try, no need to set it outside the client.
|
||||
ReadType string
|
||||
// InputRequestSource is the input source of the request, if it's not empty, the final RequestSource sent to store will be attached with the retry info.
|
||||
InputRequestSource string
|
||||
}
|
||||
|
||||
// NewRequest returns new kv rpc request.
|
||||
|
|
|
|||
|
|
@ -202,6 +202,8 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
var loc *locate.KeyLocation
|
||||
var resolvingRecordToken *int
|
||||
var err error
|
||||
// the states in request need to keep when retry request.
|
||||
var readType string
|
||||
for {
|
||||
if !s.reverse {
|
||||
loc, err = s.snapshot.store.GetRegionCache().LocateKey(bo, s.nextStartKey)
|
||||
|
|
@ -245,12 +247,16 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
TaskId: s.snapshot.mu.taskID,
|
||||
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
|
||||
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
|
||||
RequestSource: s.snapshot.GetRequestSource(),
|
||||
ResourceControlContext: &kvrpcpb.ResourceControlContext{
|
||||
ResourceGroupName: s.snapshot.mu.resourceGroupName,
|
||||
},
|
||||
BusyThresholdMs: uint32(s.snapshot.mu.busyThreshold.Milliseconds()),
|
||||
})
|
||||
if readType != "" {
|
||||
req.ReadType = readType
|
||||
req.IsRetryRequest = true
|
||||
}
|
||||
req.InputRequestSource = s.snapshot.GetRequestSource()
|
||||
if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil {
|
||||
s.snapshot.mu.resourceGroupTagger(req)
|
||||
}
|
||||
|
|
@ -263,6 +269,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readType = req.ReadType
|
||||
if regionErr != nil {
|
||||
logutil.BgLogger().Debug("scanner getData failed",
|
||||
zap.Stringer("regionErr", regionErr))
|
||||
|
|
|
|||
|
|
@ -389,6 +389,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
|
|||
pending := batch.keys
|
||||
var resolvingRecordToken *int
|
||||
useConfigurableKVTimeout := true
|
||||
// the states in request need to keep when retry request.
|
||||
var readType string
|
||||
for {
|
||||
s.mu.RLock()
|
||||
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{
|
||||
|
|
@ -400,12 +402,16 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
|
|||
TaskId: s.mu.taskID,
|
||||
ResourceGroupTag: s.mu.resourceGroupTag,
|
||||
IsolationLevel: s.isolationLevel.ToPB(),
|
||||
RequestSource: s.GetRequestSource(),
|
||||
ResourceControlContext: &kvrpcpb.ResourceControlContext{
|
||||
ResourceGroupName: s.mu.resourceGroupName,
|
||||
},
|
||||
BusyThresholdMs: uint32(busyThresholdMs),
|
||||
})
|
||||
req.InputRequestSource = s.GetRequestSource()
|
||||
if readType != "" {
|
||||
req.ReadType = readType
|
||||
req.IsRetryRequest = true
|
||||
}
|
||||
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
|
||||
s.mu.resourceGroupTagger(req)
|
||||
}
|
||||
|
|
@ -443,6 +449,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readType = req.ReadType
|
||||
if regionErr != nil {
|
||||
// For other region error and the fake region error, backoff because
|
||||
// there's something wrong.
|
||||
|
|
@ -626,12 +633,12 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
|
|||
TaskId: s.mu.taskID,
|
||||
ResourceGroupTag: s.mu.resourceGroupTag,
|
||||
IsolationLevel: s.isolationLevel.ToPB(),
|
||||
RequestSource: s.GetRequestSource(),
|
||||
ResourceControlContext: &kvrpcpb.ResourceControlContext{
|
||||
ResourceGroupName: s.mu.resourceGroupName,
|
||||
},
|
||||
BusyThresholdMs: uint32(s.mu.busyThreshold.Milliseconds()),
|
||||
})
|
||||
req.InputRequestSource = s.GetRequestSource()
|
||||
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
|
||||
s.mu.resourceGroupTagger(req)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue