Merge branch 'master' into get_min_resolved_ts_by_stores

This commit is contained in:
Hu# 2023-08-14 13:24:52 +08:00 committed by GitHub
commit d8451eb284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 561 additions and 67 deletions

View File

@ -14,7 +14,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.20.x"
go-version: "1.21.0"
- name: Checkout Client-Go
uses: actions/checkout@v2

View File

@ -17,7 +17,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Test
run: go test ./...
@ -32,7 +32,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Test
run: go test ./... -race
@ -47,7 +47,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Fetch PD
uses: shrink/actions-docker-extract@v1
@ -89,7 +89,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Fetch PD
uses: shrink/actions-docker-extract@v1

View File

@ -15,7 +15,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Test
run: go test ./...
@ -28,7 +28,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Test with race
run: go test -race ./...
@ -42,7 +42,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.2
go-version: 1.21.0
- name: Lint
uses: golangci/golangci-lint-action@v3

View File

@ -37,7 +37,6 @@ func main() {
panic(err)
}
sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10))
if err != nil {
panic(err)

View File

@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}
func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
}
}
// Choose a connection by round-robbin.
var (
cli *batchCommandsClient

View File

@ -107,10 +107,15 @@ func buildResourceControlInterceptor(
// Build the interceptor.
interceptFn := func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// bypass some internal requests and it's may influence user experience. For example, the
// request of `alter user password`, totally bypasses the resource control. it's not cost
// many resources, but it's may influence the user experience.
// If the resource group has background jobs, we should not record consumption and wait for it.
if resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) {
// Background jobs will record and report in tikv side.
if reqInfo.Bypass() || resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) {
return next(target, req)
}
consumption, penalty, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err

View File

@ -26,6 +26,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.newBackoffFn.func1"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
}
goleak.VerifyTestMain(m, opts...)
}

View File

@ -1289,9 +1289,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
c.mu.RLock()
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
oldRegion.asyncReload.Store(false)
}
c.mu.RUnlock()
return
}
c.mu.Lock()

View File

@ -245,6 +245,8 @@ type replica struct {
peer *metapb.Peer
epoch uint32
attempts int
// deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error.
deadlineErrUsingConfTimeout bool
}
func (r *replica) isEpochStale() bool {
@ -377,7 +379,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
}
func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true}
}
// tryFollower is the state where we cannot access the known leader
@ -391,36 +393,84 @@ type tryFollower struct {
stateBase
leaderIdx AccessIndex
lastIdx AccessIndex
// fromOnNotLeader indicates whether the state is changed from onNotLeader.
fromOnNotLeader bool
labels []*metapb.StoreLabel
}
func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
var targetReplica *replica
// Search replica that is not attempted from the last accessed replica
for i := 1; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
hasDeadlineExceededErr := false
//hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
for i := 0; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
}
selectReplica := selector.replicas[idx]
hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout
if selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout &&
fn(selectReplica) {
return idx, selectReplica
}
}
targetReplica = selector.replicas[idx]
// Each follower is only tried once
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
return -1, nil
}
if len(state.labels) > 0 {
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return selectReplica.store.IsLabelsMatch(state.labels)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
}
// labels only take effect for first try.
state.labels = nil
}
if selector.targetIdx < 0 {
// Search replica that is not attempted from the last accessed replica
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return !selectReplica.isExhausted(1)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
break
}
}
// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
if hasDeadlineExceededErr {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
return nil, nil
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return rpcCtx, err
}
if !state.fromOnNotLeader {
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
return rpcCtx, nil
}
func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
if state.fromOnNotLeader {
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
zap.Uint64("store", peer.StoreId),
zap.Uint64("peer", peer.Id))
}
}
}
@ -542,6 +592,10 @@ type accessFollower struct {
learnerOnly bool
}
// Follower read will try followers first, if no follower is available, it will fallback to leader.
// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader,
// if the leader read receive server-is-busy and connection errors, the region cache is still valid,
// and the state will be changed to tryFollower, which will read by replica read.
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
replicaSize := len(selector.replicas)
resetStaleRead := false
@ -610,14 +664,33 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1))
leaderEpochStale := leader.isEpochStale()
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
// If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout.
if leader.deadlineErrUsingConfTimeout {
return nil, nil
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
selector.state = &tryFollower{
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
}
return nil, stateChanged{}
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
@ -644,6 +717,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
return rpcCtx, nil
}
func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// Allow another extra retry for the following case:
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
// 2. Data is not ready is returned from the leader peer.
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2)
} else {
return leader.isExhausted(1)
}
}
func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
@ -652,26 +738,28 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
// the epoch is staled or retry exhausted, or the store is unreachable.
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout {
return false
}
// The request can only be sent to the leader.
if state.option.leaderOnly && idx == state.leaderIdx {
return true
if state.option.leaderOnly {
// The request can only be sent to the leader.
return idx == state.leaderIdx
}
// Choose a replica with matched labels.
followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) &&
replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)
if !followerCandidate {
if !state.tryLeader && idx == state.leaderIdx {
// The request cannot be sent to leader.
return false
}
if state.learnerOnly {
// The request can only be sent to the learner.
return replica.peer.Role == metapb.PeerRole_Learner
}
// And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers
// as candidates to serve the Read request.
if state.option.preferLeader && replica.store.isSlow() {
return false
}
// If the stores are limited, check if the store is in the list.
return replica.store.IsStoreMatch(state.option.stores)
// Choose a replica with matched labels.
return replica.store.IsStoreMatch(state.option.stores) && replica.store.IsLabelsMatch(state.option.labels)
}
// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read.
@ -934,6 +1022,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
s.state.onSendFailure(bo, s, err)
}
func (s *replicaSelector) onDeadlineExceeded() {
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
}
}
func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
@ -1051,6 +1149,9 @@ func (s *replicaSelector) onServerIsBusy(
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.markAlreadySlow()
if s.canFallback2Follower() {
return true, nil
}
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
@ -1059,6 +1160,23 @@ func (s *replicaSelector) onServerIsBusy(
return true, nil
}
// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if !state.isStaleRead {
return false
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}
func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
@ -1595,7 +1713,7 @@ func (s *RegionRequestSender) sendReqToRegion(
return nil, false, err
}
}
if e := s.onSendFail(bo, rpcCtx, err); e != nil {
if e := s.onSendFail(bo, rpcCtx, req, err); e != nil {
return nil, false, err
}
return nil, true, nil
@ -1625,7 +1743,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) {
logutil.BgLogger().Warn("release store token failed, count equals to 0")
}
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error {
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context()))
defer span1.Finish()
@ -1636,6 +1754,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
return errors.WithStack(err)
} else if LoadShuttingDown() > 0 {
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
} else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
if s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
return nil
}
}
if status.Code(errors.Cause(err)) == codes.Canceled {
select {
@ -1727,6 +1850,9 @@ func regionErrorToLabel(e *errorpb.Error) string {
} else if e.GetEpochNotMatch() != nil {
return "epoch_not_match"
} else if e.GetServerIsBusy() != nil {
if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") {
return "deadline_exceeded"
}
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
return "stale_command"
@ -1754,10 +1880,16 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "flashback_not_prepared"
} else if e.GetIsWitness() != nil {
return "peer_is_witness"
} else if isDeadlineExceeded(e) {
return "deadline_exceeded"
}
return "unknown"
}
func isDeadlineExceeded(e *errorpb.Error) bool {
return strings.Contains(e.GetMessage(), "Deadline is exceeded")
}
func (s *RegionRequestSender) onRegionError(
bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error,
) (shouldRetry bool, err error) {
@ -1905,6 +2037,10 @@ func (s *RegionRequestSender) onRegionError(
}
if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") {
s.replicaSelector.onDeadlineExceeded()
return true, nil
}
if s.replicaSelector != nil {
return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy)
}
@ -2033,6 +2169,10 @@ func (s *RegionRequestSender) onRegionError(
return true, nil
}
if isDeadlineExceeded(regionErr) && s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
}
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports region failed",
zap.Stringer("regionErr", regionErr),

View File

@ -37,6 +37,7 @@ package locate
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"
@ -45,15 +46,18 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/suite"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)
func TestRegionRequestToThreeStores(t *testing.T) {
@ -344,7 +348,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
// Should switch to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
}
@ -586,7 +590,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
// Should switch to the next follower.
s.NotEqual(lastIdx, state3.lastIdx)
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
@ -710,7 +714,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
// Normal
bo := retry.NewBackoffer(context.Background(), -1)
sender := s.regionRequestSender
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.True(bo.GetTotalBackoffTimes() == 0)
@ -719,7 +723,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
bo = retry.NewBackoffer(context.Background(), -1)
s.cluster.ChangeLeader(s.regionID, s.peerIDs[1])
s.cluster.StopStore(s.storeIDs[0])
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
@ -728,8 +732,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
atomic.StoreUint32(&regionStore.stores[0].livenessState, uint32(reachable))
// Leader is updated because of send success, so no backoff.
reloadRegion()
bo = retry.NewBackoffer(context.Background(), -1)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
@ -1091,7 +1096,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
s.NotEqual(leaderAddr, "")
for i := 0; i < 10; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp)
@ -1134,7 +1139,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
for i := 0; i < 100; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 1, nil)
resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp)
// since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
@ -1142,3 +1147,232 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
s.Equal(0, retryTimes)
}
}
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}
region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
s.True(region.isValid())
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}
func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
leaderAddr := ""
reqTargetAddrs := make(map[string]struct{})
s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats()
bo := retry.NewBackoffer(context.Background(), 10000)
mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
reqTargetAddrs[addr] = struct{}{}
if req.Context.MaxExecutionDurationMs < 10 {
return nil, context.DeadlineExceeded
}
if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil
}}
getLocFn := func() *KeyLocation {
loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a"))
s.Nil(err)
region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region)
leaderStore, _, _, _ := region.WorkStorePeer(region.getStore())
leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore)
s.Nil(err)
return loc
}
resetStats := func() {
reqTargetAddrs = make(map[string]struct{})
s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient)
s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats()
}
//Test different read type.
staleReadTypes := []bool{false, true}
replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed}
for _, staleRead := range staleReadTypes {
for _, tp := range replicaReadTypes {
log.Info("TestSendReqFirstTimeout", zap.Bool("stale-read", staleRead), zap.String("replica-read-type", tp.String()))
resetStats()
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{})
if staleRead {
req.EnableStaleRead()
} else {
req.ReplicaRead = tp.IsFollowerRead()
req.ReplicaReadType = tp
}
loc := getLocFn()
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.True(IsFakeRegionError(regionErr))
s.Equal(1, len(s.regionRequestSender.Stats))
if staleRead {
rpcNum := s.regionRequestSender.Stats[tikvrpc.CmdGet].Count
s.True(rpcNum == 1 || rpcNum == 2) // 1 rpc or 2 rpc
} else {
s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc
s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store.
}
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
// warn: must rest MaxExecutionDurationMs before retry.
resetStats()
if staleRead {
req.EnableStaleRead()
} else {
req.ReplicaRead = tp.IsFollowerRead()
req.ReplicaReadType = tp
}
req.Context.MaxExecutionDurationMs = 0
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats))
s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
}
}
}
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
var followerID *uint64
for _, storeID := range s.storeIDs {
if storeID != leaderStore.storeID {
id := storeID
followerID = &id
break
}
}
s.NotNil(followerID)
followerLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(*followerID, 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
dataIsNotReady := false
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
if dataIsNotReady && req.StaleRead {
dataIsNotReady = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
if addr == leaderStore.addr {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}}, nil
}
if !req.ReplicaRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
}}
for _, localLeader := range []bool{true, false} {
dataIsNotReady = true
// data is not ready, then server is busy in the first round,
// directly server is busy in the second round.
for i := 0; i < 2; i++ {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
if localLeader {
ops = append(ops, WithMatchLabels(leaderLabel))
} else {
ops = append(ops, WithMatchLabels(followerLabel))
}
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
if localLeader {
s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value))
} else {
s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value))
}
cancel()
}
}
}

View File

@ -16,12 +16,14 @@ package resourcecontrol
import (
"reflect"
"strings"
"time"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
@ -34,12 +36,23 @@ type RequestInfo struct {
writeBytes int64
storeID uint64
replicaNumber int64
// bypass indicates whether the request should be bypassed.
// some internal request should be bypassed, such as Privilege request.
bypass bool
}
// MakeRequestInfo extracts the relevant information from a BatchRequest.
func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
var bypass bool
requestSource := req.Context.GetRequestSource()
if len(requestSource) > 0 {
if strings.Contains(requestSource, util.InternalRequestPrefix+util.InternalTxnOthers) {
bypass = true
}
}
storeID := req.Context.GetPeer().GetStoreId()
if !req.IsTxnWriteRequest() && !req.IsRawWriteRequest() {
return &RequestInfo{writeBytes: -1}
return &RequestInfo{writeBytes: -1, storeID: storeID, bypass: bypass}
}
var writeBytes int64
@ -57,7 +70,7 @@ func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
writeBytes += int64(len(k))
}
}
return &RequestInfo{writeBytes: writeBytes, storeID: req.Context.Peer.StoreId, replicaNumber: req.ReplicaNumber}
return &RequestInfo{writeBytes: writeBytes, storeID: storeID, replicaNumber: req.ReplicaNumber, bypass: bypass}
}
// IsWrite returns whether the request is a write request.
@ -68,13 +81,21 @@ func (req *RequestInfo) IsWrite() bool {
// WriteBytes returns the actual write size of the request,
// -1 will be returned if it's not a write request.
func (req *RequestInfo) WriteBytes() uint64 {
return uint64(req.writeBytes)
if req.writeBytes > 0 {
return uint64(req.writeBytes)
}
return 0
}
func (req *RequestInfo) ReplicaNumber() int64 {
return req.replicaNumber
}
// Bypass returns whether the request should be bypassed.
func (req *RequestInfo) Bypass() bool {
return req.bypass
}
func (req *RequestInfo) StoreID() uint64 {
return req.storeID
}

View File

@ -0,0 +1,48 @@
package resourcecontrol
import (
"testing"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/tikvrpc"
)
func TestMakeRequestInfo(t *testing.T) {
// Test a non-write request.
req := &tikvrpc.Request{Req: &kvrpcpb.BatchGetRequest{}, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 1}}}
info := MakeRequestInfo(req)
assert.False(t, info.IsWrite())
assert.Equal(t, uint64(0), info.WriteBytes())
assert.False(t, info.Bypass())
assert.Equal(t, uint64(1), info.StoreID())
// Test a prewrite request.
mutation := &kvrpcpb.Mutation{Key: []byte("foo"), Value: []byte("bar")}
prewriteReq := &kvrpcpb.PrewriteRequest{Mutations: []*kvrpcpb.Mutation{mutation}, PrimaryLock: []byte("baz")}
req = &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, Req: prewriteReq, ReplicaNumber: 1, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 2}}}
requestSource := "xxx_internal_others"
req.Context.RequestSource = requestSource
info = MakeRequestInfo(req)
assert.True(t, info.IsWrite())
assert.Equal(t, uint64(9), info.WriteBytes())
assert.True(t, info.Bypass())
assert.Equal(t, uint64(2), info.StoreID())
// Test a commit request.
commitReq := &kvrpcpb.CommitRequest{Keys: [][]byte{[]byte("qux")}}
req = &tikvrpc.Request{Type: tikvrpc.CmdCommit, Req: commitReq, ReplicaNumber: 2, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 3}}}
info = MakeRequestInfo(req)
assert.True(t, info.IsWrite())
assert.Equal(t, uint64(3), info.WriteBytes())
assert.False(t, info.Bypass())
assert.Equal(t, uint64(3), info.StoreID())
// Test Nil Peer in Context
req = &tikvrpc.Request{Type: tikvrpc.CmdCommit, Req: commitReq, ReplicaNumber: 2, Context: kvrpcpb.Context{}}
info = MakeRequestInfo(req)
assert.True(t, info.IsWrite())
assert.Equal(t, uint64(3), info.WriteBytes())
assert.False(t, info.Bypass())
assert.Equal(t, uint64(0), info.StoreID())
}

View File

@ -118,6 +118,7 @@ type KVSnapshot struct {
resolvedLocks util.TSSet
committedLocks util.TSSet
scanBatchSize int
readTimeout time.Duration
// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
@ -387,6 +388,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
pending := batch.keys
var resolvingRecordToken *int
useConfigurableKVTimeout := true
for {
s.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{
@ -416,6 +418,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
if isStaleness {
req.EnableStaleRead()
}
timeout := client.ReadTimeoutMedium
if useConfigurableKVTimeout && s.readTimeout > 0 {
useConfigurableKVTimeout = false
timeout = s.readTimeout
}
req.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
ops := make([]locate.StoreSelectorOption, 0, 2)
if len(matchStoreLabels) > 0 {
ops = append(ops, locate.WithMatchLabels(matchStoreLabels))
@ -427,7 +435,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
}
req.ReplicaReadType = readType
}
resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...)
resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, timeout, tikvrpc.TiKV, "", ops...)
if err != nil {
return err
}
@ -651,13 +659,20 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
var firstLock *txnlock.Lock
var resolvingRecordToken *int
useConfigurableKVTimeout := true
for {
util.EvalFailpoint("beforeSendPointGet")
loc, err := s.store.GetRegionCache().LocateKey(bo, k)
if err != nil {
return nil, err
}
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV, "", ops...)
timeout := client.ReadTimeoutShort
if useConfigurableKVTimeout && s.readTimeout > 0 {
useConfigurableKVTimeout = false
timeout = s.readTimeout
}
req.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, "", ops...)
if err != nil {
return nil, err
}
@ -984,6 +999,16 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.R
}
}
// SetKVReadTimeout sets timeout for individual KV read operations under this snapshot
func (s *KVSnapshot) SetKVReadTimeout(readTimeout time.Duration) {
s.readTimeout = readTimeout
}
// GetKVReadTimeout returns timeout for individual KV read operations under this snapshot or 0 if timeout is not set
func (s *KVSnapshot) GetKVReadTimeout() time.Duration {
return s.readTimeout
}
func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail {
s.mu.RLock()
defer s.mu.RUnlock()

View File

@ -44,19 +44,21 @@ const (
// explicit source types.
const (
ExplicitTypeEmpty = ""
ExplicitTypeDefault = "default"
ExplicitTypeLightning = "lightning"
ExplicitTypeBR = "br"
ExplicitTypeDumpling = "dumpling"
ExplicitTypeBackground = "background"
ExplicitTypeDDL = "ddl"
)
// ExplicitTypeList is the list of all explicit source types.
var ExplicitTypeList = []string{ExplicitTypeEmpty, ExplicitTypeDefault, ExplicitTypeLightning, ExplicitTypeBR, ExplicitTypeDumpling, ExplicitTypeBackground}
var ExplicitTypeList = []string{ExplicitTypeEmpty, ExplicitTypeLightning, ExplicitTypeBR, ExplicitTypeDumpling, ExplicitTypeBackground, ExplicitTypeDDL}
const (
// InternalRequest is the scope of internal queries
InternalRequest = "internal"
// InternalRequestPrefix is the prefix of internal queries
InternalRequestPrefix = "internal_"
// ExternalRequest is the scope of external queries
ExternalRequest = "external"
// SourceUnknown keeps same with the default value(empty string)
@ -95,6 +97,15 @@ func WithInternalSourceType(ctx context.Context, source string) context.Context
})
}
// WithInternalSourceAndTaskType create context with internal source and task name.
func WithInternalSourceAndTaskType(ctx context.Context, source, taskName string) context.Context {
return context.WithValue(ctx, RequestSourceKey, RequestSource{
RequestSourceInternal: true,
RequestSourceType: source,
ExplicitRequestSourceType: taskName,
})
}
// BuildRequestSource builds a request_source from internal, source and explicitSource.
func BuildRequestSource(internal bool, source, explicitSource string) string {
requestSource := RequestSource{
@ -117,24 +128,26 @@ func IsRequestSourceInternal(reqSrc *RequestSource) bool {
// GetRequestSource gets the request_source field of the request.
func (r *RequestSource) GetRequestSource() string {
source := SourceUnknown
explicitSourceType := ExplicitTypeDefault
origin := ExternalRequest
if r == nil || (len(r.RequestSourceType) == 0 && len(r.ExplicitRequestSourceType) == 0) {
// if r.RequestSourceType and r.ExplicitRequestSourceType are not set, it's mostly possible that r.RequestSourceInternal is not set
// to avoid internal requests be marked as external(default value), return unknown source here.
return strings.Join([]string{source, explicitSourceType}, "_")
return source
}
if len(r.RequestSourceType) > 0 {
source = r.RequestSourceType
}
if len(r.ExplicitRequestSourceType) > 0 {
explicitSourceType = r.ExplicitRequestSourceType
}
origin := ExternalRequest
if r.RequestSourceInternal {
origin = InternalRequest
}
return strings.Join([]string{origin, source, explicitSourceType}, "_")
labelList := make([]string, 0, 3)
labelList = append(labelList, origin)
if len(r.RequestSourceType) > 0 {
source = r.RequestSourceType
}
labelList = append(labelList, source)
if len(r.ExplicitRequestSourceType) > 0 && r.ExplicitRequestSourceType != r.RequestSourceType {
labelList = append(labelList, r.ExplicitRequestSourceType)
}
return strings.Join(labelList, "_")
}
// RequestSourceFromCtx extract source from passed context.

View File

@ -43,19 +43,19 @@ func TestGetRequestSource(t *testing.T) {
// Test nil pointer
rs = nil
expected = "unknown_default"
expected = "unknown"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)
// Test empty RequestSourceType and ExplicitRequestSourceType
rs = &RequestSource{}
expected = "unknown_default"
expected = "unknown"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)
// Test empty ExplicitRequestSourceType
rs.RequestSourceType = "test"
expected = "external_test_default"
expected = "external_test"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)
@ -79,7 +79,7 @@ func TestBuildRequestSource(t *testing.T) {
assert.Equal(t, expected, actual)
// Test empty ExplicitRequestSourceType
expected = "external_test_default"
expected = "external_test"
actual = BuildRequestSource(false, "test", "")
assert.Equal(t, expected, actual)
@ -89,7 +89,7 @@ func TestBuildRequestSource(t *testing.T) {
assert.Equal(t, expected, actual)
// Test RequestSourceType && ExplicitRequestSourceType both empty
expected = "unknown_default"
expected = "unknown"
actual = BuildRequestSource(true, "", "")
assert.Equal(t, expected, actual)
}