mirror of https://github.com/tikv/client-go.git
Adjustment for complete support of keyspace level GC (#1720)
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> Signed-off-by: ekexium <eke@fastmail.com> Co-authored-by: ekexium <eke@fastmail.com>
This commit is contained in:
parent
dcb62bb121
commit
f8d9bebb31
|
|
@ -29,6 +29,25 @@ const (
|
||||||
NullspaceID = KeyspaceID(constants.NullKeyspaceID)
|
NullspaceID = KeyspaceID(constants.NullKeyspaceID)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// CodecV2Prefixes returns a sorted list of prefixes that will be used by Codec V2.
|
||||||
|
func CodecV2Prefixes() [][]byte {
|
||||||
|
result := [][]byte{
|
||||||
|
{RawModePrefix},
|
||||||
|
{TxnModePrefix},
|
||||||
|
}
|
||||||
|
// The result should be sorted.
|
||||||
|
// If the list becomes larger in the future, add explicit sorting if necessary.
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// CodecV1ExcludePrefixes returns a sorted list of prefixes that will not be used by Codec V1. This function can be
|
||||||
|
// used to determine if a key belongs to codec v1 in v1+v2 mixed deployment.
|
||||||
|
func CodecV1ExcludePrefixes() [][]byte {
|
||||||
|
// Currently this is identical with CodecV2Prefixes, but this function has different semantics. In the future
|
||||||
|
// if other special prefixes or codec versions are introduced, those prefixes should be added here.
|
||||||
|
return CodecV2Prefixes()
|
||||||
|
}
|
||||||
|
|
||||||
// ParseKeyspaceID retrieves the keyspaceID from the given keyspace-encoded key.
|
// ParseKeyspaceID retrieves the keyspaceID from the given keyspace-encoded key.
|
||||||
// It returns error if the given key is not in proper api-v2 format.
|
// It returns error if the given key is not in proper api-v2 format.
|
||||||
func ParseKeyspaceID(b []byte) (KeyspaceID, error) {
|
func ParseKeyspaceID(b []byte) (KeyspaceID, error) {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package apicodec
|
package apicodec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||||
|
|
@ -57,3 +58,11 @@ func TestEncodeUnknownRequest(t *testing.T) {
|
||||||
_, err := c.EncodeRequest(req)
|
_, err := c.EncodeRequest(req)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCodecListUtilityFunctions(t *testing.T) {
|
||||||
|
keyCmp := func(lhs, rhs []byte) int {
|
||||||
|
return slices.Compare(lhs, rhs)
|
||||||
|
}
|
||||||
|
assert.True(t, slices.IsSortedFunc(CodecV2Prefixes(), keyCmp))
|
||||||
|
assert.True(t, slices.IsSortedFunc(CodecV1ExcludePrefixes(), keyCmp))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,25 +18,27 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
const (
|
||||||
// DefaultKeyspaceID is the keyspaceID of the default keyspace.
|
// DefaultKeyspaceID is the keyspaceID of the default keyspace.
|
||||||
DefaultKeyspaceID uint32 = 0
|
DefaultKeyspaceID uint32 = 0
|
||||||
// DefaultKeyspaceName is the name of the default keyspace.
|
// DefaultKeyspaceName is the name of the default keyspace.
|
||||||
DefaultKeyspaceName = "DEFAULT"
|
DefaultKeyspaceName = "DEFAULT"
|
||||||
|
|
||||||
rawModePrefix byte = 'r'
|
RawModePrefix byte = 'r'
|
||||||
txnModePrefix byte = 'x'
|
TxnModePrefix byte = 'x'
|
||||||
keyspacePrefixLen = 4
|
keyspacePrefixLen = 4
|
||||||
|
|
||||||
// maxKeyspaceID is the maximum value of keyspaceID, its value is uint24Max.
|
// maxKeyspaceID is the maximum value of keyspaceID, its value is uint24Max.
|
||||||
maxKeyspaceID = uint32(0xFFFFFF)
|
maxKeyspaceID = uint32(0xFFFFFF)
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
// errKeyOutOfBound happens when key to be decoded lies outside the keyspace's range.
|
// errKeyOutOfBound happens when key to be decoded lies outside the keyspace's range.
|
||||||
errKeyOutOfBound = errors.New("given key does not belong to the keyspace")
|
errKeyOutOfBound = errors.New("given key does not belong to the keyspace")
|
||||||
)
|
)
|
||||||
|
|
||||||
func checkV2Key(b []byte) error {
|
func checkV2Key(b []byte) error {
|
||||||
if len(b) < keyspacePrefixLen || (b[0] != rawModePrefix && b[0] != txnModePrefix) {
|
if len(b) < keyspacePrefixLen || (b[0] != RawModePrefix && b[0] != TxnModePrefix) {
|
||||||
return errors.Errorf("invalid API V2 key %s", b)
|
return errors.Errorf("invalid API V2 key %s", b)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -79,9 +81,9 @@ func NewCodecV2(mode Mode, keyspaceMeta *keyspacepb.KeyspaceMeta) (Codec, error)
|
||||||
codec.endKey = make([]byte, 4)
|
codec.endKey = make([]byte, 4)
|
||||||
switch mode {
|
switch mode {
|
||||||
case ModeRaw:
|
case ModeRaw:
|
||||||
codec.prefix[0] = rawModePrefix
|
codec.prefix[0] = RawModePrefix
|
||||||
case ModeTxn:
|
case ModeTxn:
|
||||||
codec.prefix[0] = txnModePrefix
|
codec.prefix[0] = TxnModePrefix
|
||||||
default:
|
default:
|
||||||
return nil, errors.Errorf("unknown mode")
|
return nil, errors.Errorf("unknown mode")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
19
tikv/gc.go
19
tikv/gc.go
|
|
@ -154,8 +154,8 @@ func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScanLocksInOneRegion return locks and location with given start key in a region.
|
// ScanLocksInOneRegion return locks and location with given start key in a region.
|
||||||
func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) {
|
func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, endKey []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) {
|
||||||
return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit)
|
return scanLocksInOneRegionWithRange(bo, l.GetStore(), key, endKey, maxVersion, scanLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
|
// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
|
||||||
|
|
@ -179,10 +179,11 @@ type RegionLockResolver interface {
|
||||||
// ** the locks are assumed sorted by key in ascending order **
|
// ** the locks are assumed sorted by key in ascending order **
|
||||||
ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error)
|
ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error)
|
||||||
|
|
||||||
// ScanLocksInOneRegion return locks and location with given start key in a region.
|
// ScanLocksInOneRegion return locks and location with given range, and if the range spans over multiple regions,
|
||||||
|
// it stops at the end of the first region that's covered or partially covered by the specified range.
|
||||||
// The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation.
|
// The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation.
|
||||||
// which will used by ResolveLocksInOneRegion later.
|
// which will used by ResolveLocksInOneRegion later.
|
||||||
ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error)
|
ScanLocksInOneRegion(bo *Backoffer, key []byte, endKey []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error)
|
||||||
|
|
||||||
// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
|
// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
|
||||||
GetStore() Storage
|
GetStore() Storage
|
||||||
|
|
@ -210,7 +211,7 @@ func ResolveLocksForRange(
|
||||||
return stat, errors.New("[gc worker] gc job canceled")
|
return stat, errors.New("[gc worker] gc job canceled")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit)
|
locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, endKey, maxVersion, scanLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stat, err
|
return stat, err
|
||||||
}
|
}
|
||||||
|
|
@ -247,17 +248,21 @@ func ResolveLocksForRange(
|
||||||
return stat, nil
|
return stat, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
|
func scanLocksInOneRegionWithRange(bo *retry.Backoffer, store Storage, startKey []byte, endKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
|
||||||
for {
|
for {
|
||||||
loc, err := store.GetRegionCache().LocateKey(bo, startKey)
|
loc, err := store.GetRegionCache().LocateKey(bo, startKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loc, err
|
return nil, loc, err
|
||||||
}
|
}
|
||||||
|
reqEndKey := loc.EndKey
|
||||||
|
if len(endKey) > 0 && (len(reqEndKey) == 0 || bytes.Compare(endKey, reqEndKey) < 0) {
|
||||||
|
reqEndKey = endKey
|
||||||
|
}
|
||||||
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
|
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
|
||||||
MaxVersion: maxVersion,
|
MaxVersion: maxVersion,
|
||||||
Limit: limit,
|
Limit: limit,
|
||||||
StartKey: startKey,
|
StartKey: startKey,
|
||||||
EndKey: loc.EndKey,
|
EndKey: reqEndKey,
|
||||||
})
|
})
|
||||||
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
|
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -249,3 +249,16 @@ var (
|
||||||
// the region info contains old leader during the election, this variable affects nothing in most time.
|
// the region info contains old leader during the election, this variable affects nothing in most time.
|
||||||
WithNeedRegionHasLeaderPeer = locate.WithNeedRegionHasLeaderPeer
|
WithNeedRegionHasLeaderPeer = locate.WithNeedRegionHasLeaderPeer
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
CodecV2TxnKeyspacePrefix = apicodec.TxnModePrefix
|
||||||
|
CodecV2RawKeyspacePrefix = apicodec.RawModePrefix
|
||||||
|
)
|
||||||
|
|
||||||
|
func CodecV2Prefixes() [][]byte {
|
||||||
|
return apicodec.CodecV2Prefixes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func CodecV1ExcludePrefixes() [][]byte {
|
||||||
|
return apicodec.CodecV1ExcludePrefixes()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -130,7 +130,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV
|
||||||
|
|
||||||
outerLoop:
|
outerLoop:
|
||||||
for {
|
for {
|
||||||
locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit)
|
locks, loc, err := scanLocksInOneRegionWithRange(bo, s.KVStore, startKey, nil, maxVersion, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue