mirror of https://github.com/tikv/client-go.git
*: fix data race on the SetResourceGroupTagger (#491)
* *: fix data race on the SetResourceGroupTagger Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com> * *: fix data race on the SetResourceGroupTagger Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com> * *: fix data race on the SetResourceGroupTagger Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
This commit is contained in:
parent
038b03552a
commit
ff5e35ac28
|
|
@ -119,14 +119,14 @@ func (s *Scanner) Next() error {
|
|||
if !s.valid {
|
||||
return errors.New("scanner iterator is invalid")
|
||||
}
|
||||
s.snapshot.interceptorMutex.RLock()
|
||||
if s.snapshot.interceptor != nil {
|
||||
s.snapshot.mu.RLock()
|
||||
if s.snapshot.mu.interceptor != nil {
|
||||
// User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we
|
||||
// need to bind it to ctx so that the internal client can perceive and execute
|
||||
// it before initiating an RPC request.
|
||||
bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.snapshot.interceptor))
|
||||
bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.snapshot.mu.interceptor))
|
||||
}
|
||||
s.snapshot.interceptorMutex.RUnlock()
|
||||
s.snapshot.mu.RUnlock()
|
||||
var err error
|
||||
for {
|
||||
s.idx++
|
||||
|
|
@ -228,7 +228,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
Priority: s.snapshot.priority.ToPB(),
|
||||
NotFillCache: s.snapshot.notFillCache,
|
||||
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
|
||||
ResourceGroupTag: s.snapshot.resourceGroupTag,
|
||||
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
|
||||
},
|
||||
StartKey: s.nextStartKey,
|
||||
EndKey: reqEndKey,
|
||||
|
|
@ -247,11 +247,11 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
Priority: s.snapshot.priority.ToPB(),
|
||||
NotFillCache: s.snapshot.notFillCache,
|
||||
TaskId: s.snapshot.mu.taskID,
|
||||
ResourceGroupTag: s.snapshot.resourceGroupTag,
|
||||
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
|
||||
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
|
||||
})
|
||||
if s.snapshot.resourceGroupTag == nil && s.snapshot.resourceGroupTagger != nil {
|
||||
s.snapshot.resourceGroupTagger(req)
|
||||
if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil {
|
||||
s.snapshot.mu.resourceGroupTagger(req)
|
||||
}
|
||||
s.snapshot.mu.RUnlock()
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutMedium)
|
||||
|
|
|
|||
|
|
@ -133,16 +133,14 @@ type KVSnapshot struct {
|
|||
readReplicaScope string
|
||||
// MatchStoreLabels indicates the labels the store should be matched
|
||||
matchStoreLabels []*metapb.StoreLabel
|
||||
// resourceGroupTag is use to set the kv request resource group tag.
|
||||
resourceGroupTag []byte
|
||||
// resourceGroupTagger is use to set the kv request resource group tag if resourceGroupTag is nil.
|
||||
resourceGroupTagger tikvrpc.ResourceGroupTagger
|
||||
// interceptor is used to decorate the RPC request logic related to the snapshot.
|
||||
interceptor interceptor.RPCInterceptor
|
||||
}
|
||||
sampleStep uint32
|
||||
// resourceGroupTag is use to set the kv request resource group tag.
|
||||
resourceGroupTag []byte
|
||||
// resourceGroupTagger is use to set the kv request resource group tag if resourceGroupTag is nil.
|
||||
resourceGroupTagger tikvrpc.ResourceGroupTagger
|
||||
// interceptorMutex is a lock for interceptor
|
||||
interceptorMutex sync.RWMutex
|
||||
// interceptor is used to decorate the RPC request logic related to the snapshot.
|
||||
interceptor interceptor.RPCInterceptor
|
||||
}
|
||||
|
||||
// NewTiKVSnapshot creates a snapshot of an TiKV store.
|
||||
|
|
@ -209,14 +207,14 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]
|
|||
|
||||
ctx = context.WithValue(ctx, retry.TxnStartKey, s.version)
|
||||
bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars)
|
||||
s.interceptorMutex.RLock()
|
||||
if s.interceptor != nil {
|
||||
s.mu.RLock()
|
||||
if s.mu.interceptor != nil {
|
||||
// User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we
|
||||
// need to bind it to ctx so that the internal client can perceive and execute
|
||||
// it before initiating an RPC request.
|
||||
bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.interceptor))
|
||||
bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.mu.interceptor))
|
||||
}
|
||||
s.interceptorMutex.RUnlock()
|
||||
s.mu.RUnlock()
|
||||
// Create a map to collect key-values from region servers.
|
||||
var mu sync.Mutex
|
||||
err := s.batchGetKeysByRegions(bo, keys, func(k, v []byte) {
|
||||
|
|
@ -367,11 +365,11 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
|
|||
Priority: s.priority.ToPB(),
|
||||
NotFillCache: s.notFillCache,
|
||||
TaskId: s.mu.taskID,
|
||||
ResourceGroupTag: s.resourceGroupTag,
|
||||
ResourceGroupTag: s.mu.resourceGroupTag,
|
||||
IsolationLevel: s.isolationLevel.ToPB(),
|
||||
})
|
||||
if s.resourceGroupTag == nil && s.resourceGroupTagger != nil {
|
||||
s.resourceGroupTagger(req)
|
||||
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
|
||||
s.mu.resourceGroupTagger(req)
|
||||
}
|
||||
scope := s.mu.readReplicaScope
|
||||
isStaleness := s.mu.isStaleness
|
||||
|
|
@ -483,14 +481,14 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) {
|
|||
|
||||
ctx = context.WithValue(ctx, retry.TxnStartKey, s.version)
|
||||
bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars)
|
||||
s.interceptorMutex.RLock()
|
||||
if s.interceptor != nil {
|
||||
s.mu.RLock()
|
||||
if s.mu.interceptor != nil {
|
||||
// User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we
|
||||
// need to bind it to ctx so that the internal client can perceive and execute
|
||||
// it before initiating an RPC request.
|
||||
bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.interceptor))
|
||||
bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.mu.interceptor))
|
||||
}
|
||||
s.interceptorMutex.RUnlock()
|
||||
s.mu.RUnlock()
|
||||
val, err := s.get(ctx, bo, k)
|
||||
s.recordBackoffInfo(bo)
|
||||
if err != nil {
|
||||
|
|
@ -546,11 +544,11 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
|
|||
Priority: s.priority.ToPB(),
|
||||
NotFillCache: s.notFillCache,
|
||||
TaskId: s.mu.taskID,
|
||||
ResourceGroupTag: s.resourceGroupTag,
|
||||
ResourceGroupTag: s.mu.resourceGroupTag,
|
||||
IsolationLevel: s.isolationLevel.ToPB(),
|
||||
})
|
||||
if s.resourceGroupTag == nil && s.resourceGroupTagger != nil {
|
||||
s.resourceGroupTagger(req)
|
||||
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
|
||||
s.mu.resourceGroupTagger(req)
|
||||
}
|
||||
isStaleness := s.mu.isStaleness
|
||||
matchStoreLabels := s.mu.matchStoreLabels
|
||||
|
|
@ -748,34 +746,38 @@ func (s *KVSnapshot) SetMatchStoreLabels(labels []*metapb.StoreLabel) {
|
|||
|
||||
// SetResourceGroupTag sets resource group tag of the kv request.
|
||||
func (s *KVSnapshot) SetResourceGroupTag(tag []byte) {
|
||||
s.resourceGroupTag = tag
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.mu.resourceGroupTag = tag
|
||||
}
|
||||
|
||||
// SetResourceGroupTagger sets resource group tagger of the kv request.
|
||||
// Before sending the request, if resourceGroupTag is not nil, use
|
||||
// resourceGroupTag directly, otherwise use resourceGroupTagger.
|
||||
func (s *KVSnapshot) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) {
|
||||
s.resourceGroupTagger = tagger
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.mu.resourceGroupTagger = tagger
|
||||
}
|
||||
|
||||
// SetRPCInterceptor sets interceptor.RPCInterceptor for the snapshot.
|
||||
// interceptor.RPCInterceptor will be executed before each RPC request is initiated.
|
||||
// Note that SetRPCInterceptor will replace the previously set interceptor.
|
||||
func (s *KVSnapshot) SetRPCInterceptor(it interceptor.RPCInterceptor) {
|
||||
s.interceptorMutex.Lock()
|
||||
defer s.interceptorMutex.Unlock()
|
||||
s.interceptor = it
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.mu.interceptor = it
|
||||
}
|
||||
|
||||
// AddRPCInterceptor adds an interceptor, the order of addition is the order of execution.
|
||||
func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) {
|
||||
s.interceptorMutex.Lock()
|
||||
defer s.interceptorMutex.Unlock()
|
||||
if s.interceptor == nil {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.mu.interceptor == nil {
|
||||
s.SetRPCInterceptor(it)
|
||||
return
|
||||
}
|
||||
s.interceptor = interceptor.ChainRPCInterceptors(s.interceptor, it)
|
||||
s.mu.interceptor = interceptor.ChainRPCInterceptors(s.mu.interceptor, it)
|
||||
}
|
||||
|
||||
// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
|
||||
|
|
|
|||
Loading…
Reference in New Issue