mirror of https://github.com/tikv/client-go.git
gc: add resolve locks interface for tidb gc_worker (#945)
* gc: add GCResolver inteface for resolve locks Signed-off-by: 3pointer <luancheng@pingcap.com> * adapt scanlimit Signed-off-by: 3pointer <luancheng@pingcap.com> * rename GCLockResolver to RegionLockResolver Signed-off-by: 3pointer <luancheng@pingcap.com> * update Signed-off-by: 3pointer <luancheng@pingcap.com> * address comments Signed-off-by: 3pointer <luancheng@pingcap.com> --------- Signed-off-by: 3pointer <luancheng@pingcap.com>
This commit is contained in:
parent
295094e5b5
commit
a8860a9801
114
tikv/gc.go
114
tikv/gc.go
|
|
@ -35,6 +35,9 @@ import (
|
||||||
zap "go.uber.org/zap"
|
zap "go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
|
||||||
|
const GCScanLockLimit = txnlock.ResolvedCacheSize / 2
|
||||||
|
|
||||||
// GC does garbage collection (GC) of the TiKV cluster.
|
// GC does garbage collection (GC) of the TiKV cluster.
|
||||||
// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
|
// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
|
||||||
//
|
//
|
||||||
|
|
@ -81,8 +84,9 @@ func WithConcurrency(concurrency int) GCOpt {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
|
func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
|
||||||
|
lockResolver := NewRegionLockResolver("gc-client-go-api", s)
|
||||||
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
||||||
return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
|
return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
|
runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
|
||||||
|
|
@ -94,72 +98,131 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
|
type BaseRegionLockResolver struct {
|
||||||
const gcScanLockLimit = txnlock.ResolvedCacheSize / 2
|
identifier string
|
||||||
|
store Storage
|
||||||
|
}
|
||||||
|
|
||||||
func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) {
|
func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver {
|
||||||
|
return &BaseRegionLockResolver{
|
||||||
|
identifier: identifier,
|
||||||
|
store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BaseRegionLockResolver) Identifier() string {
|
||||||
|
return l.identifier
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) {
|
||||||
|
return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) {
|
||||||
|
return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BaseRegionLockResolver) GetStore() Storage {
|
||||||
|
return l.store
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegionLockResolver is used for GCWorker and log backup advancer to resolve locks in a region.
|
||||||
|
type RegionLockResolver interface {
|
||||||
|
// Identifier represents the name of this resolver.
|
||||||
|
Identifier() string
|
||||||
|
|
||||||
|
// ResolveLocksInOneRegion tries to resolve expired locks for one region.
|
||||||
|
// 1. For GCWorker it will scan locks before *safepoint*,
|
||||||
|
// and force remove these locks. rollback the txn, no matter the lock is expired of not.
|
||||||
|
// 2. For log backup advancer, it will scan all locks for a small range.
|
||||||
|
// and it will check status of the txn. resolve the locks if txn is expired, Or do nothing.
|
||||||
|
//
|
||||||
|
// regionLocation should return if resolve locks succeed. if regionLocation return nil,
|
||||||
|
// which means not all locks are resolved in someway. the caller should retry scan locks.
|
||||||
|
// ** the locks are assumed sorted by key in ascending order **
|
||||||
|
ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error)
|
||||||
|
|
||||||
|
// ScanLocksInOneRegion return locks and location with given start key in a region.
|
||||||
|
// The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation.
|
||||||
|
// which will used by ResolveLocksInOneRegion later.
|
||||||
|
ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error)
|
||||||
|
|
||||||
|
// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
|
||||||
|
GetStore() Storage
|
||||||
|
}
|
||||||
|
|
||||||
|
func ResolveLocksForRange(
|
||||||
|
ctx context.Context,
|
||||||
|
resolver RegionLockResolver,
|
||||||
|
maxVersion uint64,
|
||||||
|
startKey []byte,
|
||||||
|
endKey []byte,
|
||||||
|
createBackoffFn func(context.Context) *Backoffer,
|
||||||
|
scanLimit uint32,
|
||||||
|
) (rangetask.TaskStat, error) {
|
||||||
// for scan lock request, we must return all locks even if they are generated
|
// for scan lock request, we must return all locks even if they are generated
|
||||||
// by the same transaction. because gc worker need to make sure all locks have been
|
// by the same transaction. because gc worker need to make sure all locks have been
|
||||||
// cleaned.
|
// cleaned.
|
||||||
|
|
||||||
var stat rangetask.TaskStat
|
var stat rangetask.TaskStat
|
||||||
key := startKey
|
key := startKey
|
||||||
bo := NewGcResolveLockMaxBackoffer(ctx)
|
// create new backoffer for every scan and resolve locks
|
||||||
|
bo := createBackoffFn(ctx)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return stat, errors.New("[gc worker] gc job canceled")
|
return stat, errors.New("[gc worker] gc job canceled")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit)
|
||||||
locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stat, err
|
return stat, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc)
|
resolvedLocation, err := resolver.ResolveLocksInOneRegion(bo, locks, loc)
|
||||||
if err1 != nil {
|
if err != nil {
|
||||||
return stat, err1
|
return stat, err
|
||||||
}
|
}
|
||||||
// resolve locks failed since the locks are not in one region anymore, need retry.
|
// resolve locks failed since the locks are not in one region anymore, need retry.
|
||||||
if resolvedLocation == nil {
|
if resolvedLocation == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(locks) < gcScanLockLimit {
|
if len(locks) < int(scanLimit) {
|
||||||
stat.CompletedRegions++
|
stat.CompletedRegions++
|
||||||
key = loc.EndKey
|
key = loc.EndKey
|
||||||
logutil.Logger(ctx).Info("[gc worker] one region finshed ",
|
logutil.Logger(ctx).Debug("resolve one region finshed ",
|
||||||
|
zap.String("identifier", resolver.Identifier()),
|
||||||
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
|
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
|
||||||
zap.Int("resolvedLocksNum", len(locks)))
|
zap.Int("resolvedLocksNum", len(locks)))
|
||||||
} else {
|
} else {
|
||||||
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
|
logutil.Logger(ctx).Info("region has more than limit locks",
|
||||||
|
zap.String("identifier", resolver.Identifier()),
|
||||||
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
|
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
|
||||||
zap.Int("resolvedLocksNum", len(locks)),
|
zap.Int("resolvedLocksNum", len(locks)),
|
||||||
zap.Int("scan lock limit", gcScanLockLimit))
|
zap.Uint32("scan lock limit", scanLimit))
|
||||||
key = locks[len(locks)-1].Key
|
key = locks[len(locks)-1].Key
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
|
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
bo = NewGcResolveLockMaxBackoffer(ctx)
|
bo = createBackoffFn(ctx)
|
||||||
}
|
}
|
||||||
return stat, nil
|
return stat, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
|
func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
|
||||||
for {
|
for {
|
||||||
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
|
loc, err := store.GetRegionCache().LocateKey(bo, startKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loc, err
|
return nil, loc, err
|
||||||
}
|
}
|
||||||
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
|
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
|
||||||
MaxVersion: maxVersion,
|
MaxVersion: maxVersion,
|
||||||
Limit: gcScanLockLimit,
|
Limit: limit,
|
||||||
StartKey: startKey,
|
StartKey: startKey,
|
||||||
EndKey: loc.EndKey,
|
EndKey: loc.EndKey,
|
||||||
})
|
})
|
||||||
resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
|
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loc, err
|
return nil, loc, err
|
||||||
}
|
}
|
||||||
|
|
@ -190,15 +253,18 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// batchResolveLocksInARegion resolves locks in a region.
|
// batchResolveLocksInOneRegion resolves locks in a region.
|
||||||
// It returns the real location of the resolved locks if resolve locks success.
|
// It returns the real location of the resolved locks if resolve locks success.
|
||||||
// It returns error when meet an unretryable error.
|
// It returns error when meet an unretryable error.
|
||||||
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err.
|
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err.
|
||||||
// Used it in gcworker only!
|
// Used it in gcworker only!
|
||||||
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
|
func batchResolveLocksInOneRegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
|
||||||
|
if expectedLoc == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
resolvedLocation = expectedLoc
|
resolvedLocation = expectedLoc
|
||||||
for {
|
for {
|
||||||
ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
|
ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
|
||||||
if ok {
|
if ok {
|
||||||
return resolvedLocation, nil
|
return resolvedLocation, nil
|
||||||
}
|
}
|
||||||
|
|
@ -209,7 +275,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key)
|
region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key)
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
return nil, err1
|
return nil, err1
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -123,7 +123,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV
|
||||||
|
|
||||||
outerLoop:
|
outerLoop:
|
||||||
for {
|
for {
|
||||||
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
|
locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue