diff --git a/internal/apicodec/codec.go b/internal/apicodec/codec.go index ea3ab091..f8f9ce19 100644 --- a/internal/apicodec/codec.go +++ b/internal/apicodec/codec.go @@ -29,6 +29,25 @@ const ( NullspaceID = KeyspaceID(constants.NullKeyspaceID) ) +// CodecV2Prefixes returns a sorted list of prefixes that will be used by Codec V2. +func CodecV2Prefixes() [][]byte { + result := [][]byte{ + {RawModePrefix}, + {TxnModePrefix}, + } + // The result should be sorted. + // If the list becomes larger in the future, add explicit sorting if necessary. + return result +} + +// CodecV1ExcludePrefixes returns a sorted list of prefixes that will not be used by Codec V1. This function can be +// used to determine if a key belongs to codec v1 in v1+v2 mixed deployment. +func CodecV1ExcludePrefixes() [][]byte { + // Currently this is identical with CodecV2Prefixes, but this function has different semantics. In the future + // if other special prefixes or codec versions are introduced, those prefixes should be added here. + return CodecV2Prefixes() +} + // ParseKeyspaceID retrieves the keyspaceID from the given keyspace-encoded key. // It returns error if the given key is not in proper api-v2 format. func ParseKeyspaceID(b []byte) (KeyspaceID, error) { diff --git a/internal/apicodec/codec_test.go b/internal/apicodec/codec_test.go index d8e5cfc0..3344b203 100644 --- a/internal/apicodec/codec_test.go +++ b/internal/apicodec/codec_test.go @@ -1,6 +1,7 @@ package apicodec import ( + "slices" "testing" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -57,3 +58,11 @@ func TestEncodeUnknownRequest(t *testing.T) { _, err := c.EncodeRequest(req) assert.Nil(t, err) } + +func TestCodecListUtilityFunctions(t *testing.T) { + keyCmp := func(lhs, rhs []byte) int { + return slices.Compare(lhs, rhs) + } + assert.True(t, slices.IsSortedFunc(CodecV2Prefixes(), keyCmp)) + assert.True(t, slices.IsSortedFunc(CodecV1ExcludePrefixes(), keyCmp)) +} diff --git a/internal/apicodec/codec_v2.go b/internal/apicodec/codec_v2.go index 19de78ce..d7f9f67b 100644 --- a/internal/apicodec/codec_v2.go +++ b/internal/apicodec/codec_v2.go @@ -18,25 +18,27 @@ import ( "go.uber.org/zap" ) -var ( +const ( // DefaultKeyspaceID is the keyspaceID of the default keyspace. DefaultKeyspaceID uint32 = 0 // DefaultKeyspaceName is the name of the default keyspace. DefaultKeyspaceName = "DEFAULT" - rawModePrefix byte = 'r' - txnModePrefix byte = 'x' + RawModePrefix byte = 'r' + TxnModePrefix byte = 'x' keyspacePrefixLen = 4 // maxKeyspaceID is the maximum value of keyspaceID, its value is uint24Max. maxKeyspaceID = uint32(0xFFFFFF) +) +var ( // errKeyOutOfBound happens when key to be decoded lies outside the keyspace's range. errKeyOutOfBound = errors.New("given key does not belong to the keyspace") ) func checkV2Key(b []byte) error { - if len(b) < keyspacePrefixLen || (b[0] != rawModePrefix && b[0] != txnModePrefix) { + if len(b) < keyspacePrefixLen || (b[0] != RawModePrefix && b[0] != TxnModePrefix) { return errors.Errorf("invalid API V2 key %s", b) } return nil @@ -79,9 +81,9 @@ func NewCodecV2(mode Mode, keyspaceMeta *keyspacepb.KeyspaceMeta) (Codec, error) codec.endKey = make([]byte, 4) switch mode { case ModeRaw: - codec.prefix[0] = rawModePrefix + codec.prefix[0] = RawModePrefix case ModeTxn: - codec.prefix[0] = txnModePrefix + codec.prefix[0] = TxnModePrefix default: return nil, errors.Errorf("unknown mode") } diff --git a/tikv/gc.go b/tikv/gc.go index 2f5f152b..82ff5e1f 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -154,8 +154,8 @@ func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks [] } // ScanLocksInOneRegion return locks and location with given start key in a region. -func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { - return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) +func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, endKey []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { + return scanLocksInOneRegionWithRange(bo, l.GetStore(), key, endKey, maxVersion, scanLimit) } // GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. @@ -179,10 +179,11 @@ type RegionLockResolver interface { // ** the locks are assumed sorted by key in ascending order ** ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error) - // ScanLocksInOneRegion return locks and location with given start key in a region. + // ScanLocksInOneRegion return locks and location with given range, and if the range spans over multiple regions, + // it stops at the end of the first region that's covered or partially covered by the specified range. // The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation. // which will used by ResolveLocksInOneRegion later. - ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) + ScanLocksInOneRegion(bo *Backoffer, key []byte, endKey []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) // GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. GetStore() Storage @@ -210,7 +211,7 @@ func ResolveLocksForRange( return stat, errors.New("[gc worker] gc job canceled") default: } - locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit) + locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, endKey, maxVersion, scanLimit) if err != nil { return stat, err } @@ -247,17 +248,21 @@ func ResolveLocksForRange( return stat, nil } -func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { +func scanLocksInOneRegionWithRange(bo *retry.Backoffer, store Storage, startKey []byte, endKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { for { loc, err := store.GetRegionCache().LocateKey(bo, startKey) if err != nil { return nil, loc, err } + reqEndKey := loc.EndKey + if len(endKey) > 0 && (len(reqEndKey) == 0 || bytes.Compare(endKey, reqEndKey) < 0) { + reqEndKey = endKey + } req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ MaxVersion: maxVersion, Limit: limit, StartKey: startKey, - EndKey: loc.EndKey, + EndKey: reqEndKey, }) resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { diff --git a/tikv/region.go b/tikv/region.go index b9c85ba1..58cf7e68 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -249,3 +249,16 @@ var ( // the region info contains old leader during the election, this variable affects nothing in most time. WithNeedRegionHasLeaderPeer = locate.WithNeedRegionHasLeaderPeer ) + +const ( + CodecV2TxnKeyspacePrefix = apicodec.TxnModePrefix + CodecV2RawKeyspacePrefix = apicodec.RawModePrefix +) + +func CodecV2Prefixes() [][]byte { + return apicodec.CodecV2Prefixes() +} + +func CodecV1ExcludePrefixes() [][]byte { + return apicodec.CodecV1ExcludePrefixes() +} diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 0276cab9..87fd74ae 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -130,7 +130,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV outerLoop: for { - locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit) + locks, loc, err := scanLocksInOneRegionWithRange(bo, s.KVStore, startKey, nil, maxVersion, limit) if err != nil { return nil, err }