add resource group name in request context (#650)

Signed-off-by: glorv <glorvs@163.com>
This commit is contained in:
glorv 2023-01-09 15:18:45 +08:00 committed by GitHub
parent 5a4b9accad
commit 4e1d38d8f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 112 additions and 50 deletions

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0

4
go.sum
View File

@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a h1:4TiR0nGKmLmu2kTC22jOhUIplmv4GGUrUD9D2DTMms0=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a
github.com/pingcap/tidb v1.1.0-beta.0.20221101102559-97add26c8f84
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0

View File

@ -408,8 +408,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a h1:4TiR0nGKmLmu2kTC22jOhUIplmv4GGUrUD9D2DTMms0=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=

View File

@ -133,6 +133,7 @@ type twoPhaseCommitter struct {
detail unsafe.Pointer
txnSize int
hasNoNeedCommitKeys bool
resourceGroupName string
primaryKey []byte
forUpdateTS uint64
@ -705,6 +706,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
c.syncLog = txn.syncLog
c.resourceGroupTag = txn.resourceGroupTag
c.resourceGroupTagger = txn.resourceGroupTagger
c.resourceGroupName = txn.resourceGroupName
c.setDetail(commitDetail)
return nil

View File

@ -68,6 +68,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
ResourceGroupTag: c.resourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)

View File

@ -78,6 +78,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
TxnSource: c.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)

View File

@ -129,6 +129,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))

View File

@ -185,6 +185,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
TxnSource: c.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(r)

View File

@ -160,6 +160,8 @@ type KVTxn struct {
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
// resourceGroupName is the name of tenent resource group.
resourceGroupName string
aggressiveLockingContext *aggressiveLockingContext
aggressiveLockingDirty bool
@ -298,6 +300,12 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) {
txn.GetSnapshot().SetResourceGroupTagger(tagger)
}
// SetResourceGroupName set resource group name for both read and write.
func (txn *KVTxn) SetResourceGroupName(name string) {
txn.resourceGroupName = name
txn.GetSnapshot().SetResourceGroupName(name)
}
// SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot.
// interceptor.RPCInterceptor will be executed before each RPC request is initiated.
// Note that SetRPCInterceptor will replace the previously set interceptor.

View File

@ -136,13 +136,15 @@ func (s TxnStatus) Action() kvrpcpb.Action { return s.action }
// StatusCacheable checks whether the transaction status is certain.True will be
// returned if its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
func (s TxnStatus) StatusCacheable() bool {
if s.IsCommitted() {
@ -285,7 +287,11 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos},
kvrpcpb.Context{RequestSource: util.RequestSourceFromCtx(bo.GetCtx())})
kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
},
)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
startTime = time.Now()
resp, err := lr.store.SendReq(bo, req, loc, client.ReadTimeoutShort)
@ -342,14 +348,14 @@ func (lr *LockResolver) ResolveLocksWithOpts(bo *retry.Backoffer, opts ResolveLo
}
// ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2) For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
// 1. Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2. For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3. Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
opts := ResolveLocksOptions{
CallerStartTS: callerStartTS,
@ -702,7 +708,8 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
ForceSyncCommit: forceSyncCommit,
ResolvingPessimisticLock: resolvingPessimisticLock,
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
@ -845,7 +852,8 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK
StartVersion: txnID,
}
req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc()
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
@ -1000,7 +1008,9 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region
lreq.CommitVersion = status.CommitTS()
}
lreq.Keys = keys
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, region, client.ReadTimeoutShort)
if err != nil {
@ -1076,7 +1086,9 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
metrics.LockResolverCountWithResolveLockLite.Inc()
lreq.Keys = [][]byte{l.Key}
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
@ -1130,7 +1142,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
ForUpdateTs: forUpdateTS,
Keys: [][]byte{l.Key},
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq)
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {

View File

@ -226,11 +226,12 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
sreq := &kvrpcpb.ScanRequest{
Context: &kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
},
StartKey: s.nextStartKey,
EndKey: reqEndKey,
@ -246,12 +247,13 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
s.snapshot.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
})
if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil {
s.snapshot.mu.resourceGroupTagger(req)

View File

@ -146,6 +146,8 @@ type KVSnapshot struct {
resourceGroupTagger tikvrpc.ResourceGroupTagger
// interceptor is used to decorate the RPC request logic related to the snapshot.
interceptor interceptor.RPCInterceptor
// resourceGroupName is used to bind the request to specified resource group.
resourceGroupName string
}
sampleStep uint32
*util.RequestSource
@ -375,12 +377,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
Keys: pending,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
ResourceGroupName: s.mu.resourceGroupName,
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
@ -578,12 +581,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
Key: k,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
ResourceGroupName: s.mu.resourceGroupName,
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
@ -852,6 +856,13 @@ func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) {
s.mu.interceptor = interceptor.ChainRPCInterceptors(s.mu.interceptor, it)
}
// SetResourceGroupName set resource group name of the kv request.
func (s *KVSnapshot) SetResourceGroupName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.resourceGroupName = name
}
// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
func (s *KVSnapshot) SnapCacheHitCount() int {
return int(atomic.LoadInt64(&s.mu.hitCnt))

View File

@ -80,3 +80,24 @@ func RequestSourceFromCtx(ctx context.Context) string {
}
return SourceUnknown
}
// ResourceGroupNameKeyType is the context key type of resource group name.
type resourceGroupNameKeyType struct{}
// ResourceGroupNameKey is used as the key of request source type in context.
var resourceGroupNameKey = resourceGroupNameKeyType{}
// WithResouceGroupName return a copy of the given context with a associated
// reosurce group name.
func WithResouceGroupName(ctx context.Context, groupName string) context.Context {
return context.WithValue(ctx, resourceGroupNameKey, groupName)
}
// ResourceGroupNameFromCtx extract resource group name from passed context,
// empty string is returned is the key is not set.
func ResourceGroupNameFromCtx(ctx context.Context) string {
if val := ctx.Value(resourceGroupNameKey); val != nil {
return val.(string)
}
return ""
}