add replica read adjuster function in KVSnapshot (#540)

Signed-off-by: glorv <glorvs@163.com>
This commit is contained in:
glorv 2022-07-07 17:12:24 +08:00 committed by GitHub
parent 97c41742ea
commit 114ba4082e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 564 additions and 39 deletions

View File

@ -6,16 +6,16 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
github.com/pingcap/tidb v1.1.0-beta.0.20220621061036-5c9ad77ae1f1
github.com/pingcap/tidb/parser v0.0.0-20220621061036-5c9ad77ae1f1 // indirect
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/tidb v1.1.0-beta.0.20220706093502-562b03368993
github.com/pingcap/tidb/parser v0.0.0-20220706093502-562b03368993 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tikv/client-go/v2 v2.0.1-0.20220613112734-be31f33ba03b
github.com/tikv/client-go/v2 v2.0.1-0.20220627063500-947d923945fd
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
go.uber.org/goleak v1.1.12
)
replace github.com/tikv/client-go/v2 => ../
replace github.com/pingcap/tidb => github.com/you06/tidb v1.1.0-beta.0.20220620132310-ba06be65cc3b
replace github.com/pingcap/tidb => github.com/glorv/tidb v1.1.0-beta.0.20220706085636-88fb3fa642bb

File diff suppressed because it is too large Load Diff

View File

@ -30,6 +30,10 @@ type SnapshotRuntimeStats = txnsnapshot.SnapshotRuntimeStats
// IsoLevel is the transaction's isolation level.
type IsoLevel = txnsnapshot.IsoLevel
// ReplicaReadAdjuster is a function that adjust the StoreSelectorOption and ReplicaReadType
// based on the keys count for BatchPointGet and PointGet
type ReplicaReadAdjuster = txnsnapshot.ReplicaReadAdjuster
// IsoLevel value for transaction priority.
const (
SI = txnsnapshot.SI

View File

@ -100,6 +100,10 @@ type kvstore interface {
GetOracle() oracle.Oracle
}
// ReplicaReadAdjuster is a function that adjust the StoreSelectorOption and ReplicaReadType
// based on the keys count for BatchPointGet and PointGet
type ReplicaReadAdjuster func(int) (locate.StoreSelectorOption, kv.ReplicaReadType)
// KVSnapshot implements the tidbkv.Snapshot interface.
type KVSnapshot struct {
store kvstore
@ -131,6 +135,8 @@ type KVSnapshot struct {
taskID uint64
isStaleness bool
readReplicaScope string
// replicaReadAdjuster check and adjust the replica read type and store match labels.
replicaReadAdjuster ReplicaReadAdjuster
// MatchStoreLabels indicates the labels the store should be matched
matchStoreLabels []*metapb.StoreLabel
// resourceGroupTag is use to set the kv request resource group tag.
@ -381,6 +387,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
scope := s.mu.readReplicaScope
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
replicaAdjuster := s.mu.replicaReadAdjuster
s.mu.RUnlock()
req.TxnScope = scope
req.ReadReplicaScope = scope
@ -391,6 +398,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
if len(matchStoreLabels) > 0 {
ops = append(ops, locate.WithMatchLabels(matchStoreLabels))
}
if req.ReplicaReadType.IsFollowerRead() && replicaAdjuster != nil {
op, readType := replicaAdjuster(len(pending))
if op != nil {
ops = append(ops, op)
}
req.ReplicaReadType = readType
}
resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...)
if err != nil {
return err
@ -576,6 +590,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
scope := s.mu.readReplicaScope
replicaAdjuster := s.mu.replicaReadAdjuster
s.mu.RUnlock()
req.TxnScope = scope
req.ReadReplicaScope = scope
@ -586,6 +601,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
if len(matchStoreLabels) > 0 {
ops = append(ops, locate.WithMatchLabels(matchStoreLabels))
}
if req.ReplicaReadType.IsFollowerRead() && replicaAdjuster != nil {
op, readType := replicaAdjuster(1)
if op != nil {
ops = append(ops, op)
}
req.ReplicaReadType = readType
}
var firstLock *txnlock.Lock
var resolvingRecordToken *int
@ -772,8 +794,15 @@ func (s *KVSnapshot) SetReadReplicaScope(scope string) {
s.mu.readReplicaScope = scope
}
// SetIsStatenessReadOnly indicates whether the transaction is staleness read only transaction
func (s *KVSnapshot) SetIsStatenessReadOnly(b bool) {
// SetReplicaReadAdjuster set replica read adjust function
func (s *KVSnapshot) SetReplicaReadAdjuster(f ReplicaReadAdjuster) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.replicaReadAdjuster = f
}
// SetIsStalenessReadOnly indicates whether the transaction is staleness read only transaction
func (s *KVSnapshot) SetIsStalenessReadOnly(b bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.isStaleness = b