region_cache: check epoch before insert (#1079)

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
Yongbo Jiang 2023-12-20 14:05:16 +08:00 committed by GitHub
parent 4ce1e45671
commit 85ca0a4a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 297 additions and 31 deletions

View File

@ -549,6 +549,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}
// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
}
// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu = *newRegionIndexMu(nil)
@ -558,8 +570,8 @@ func (c *RegionCache) clear() {
}
// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
}
// Close releases region cache's resource.
@ -1491,9 +1503,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
// It should be protected by c.mu.l.Lock().
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
// Moreover, it will return false if the region is stale.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
newVer := cachedRegion.VerID()
oldVer, ok := mu.latestVersions[newVer.id]
// There are two or more situations in which the region we got is stale.
// The first case is that the process of getting a region is concurrent.
// The stale region may be returned later due to network reasons.
// The second case is that the region may be obtained from the PD follower,
// and there is the synchronization time between the pd follower and the leader.
// So we should check the epoch.
if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()),
zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer()))
return false
}
// Also check and remove the intersecting regions including the old region.
intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, newVer)
if stale {
return false
}
// Insert the region (won't replace because of above deletion).
mu.sorted.ReplaceOrInsert(cachedRegion)
// Inherit the workTiKVIdx, workTiFlashIdx and buckets from the first intersected region.
if len(intersectedRegions) > 0 {
oldRegion := intersectedRegions[0].cachedRegion
store := cachedRegion.getStore()
oldRegionStore := oldRegion.getStore()
// TODO(youjiali1995): remove this because the new retry logic can handle this issue.
@ -1506,15 +1541,6 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
oldRegion.invalidate(Other)
} else {
oldRegion.invalidateWithoutMetrics(Other)
}
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
@ -1523,21 +1549,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
store.buckets = oldRegionStore.buckets
}
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
}
mu.regions[cachedRegion.VerID()] = cachedRegion
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := mu.sorted.removeIntersecting(cachedRegion)
for _, region := range deleted {
for _, region := range intersectedRegions {
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
region.cachedRegion.invalidate(Other)
} else {
region.cachedRegion.invalidateWithoutMetrics(Other)
}
}
}
// update related vars.
mu.regions[newVer] = cachedRegion
mu.latestVersions[newVer.id] = newVer
return true
}
} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful

View File

@ -55,6 +55,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
)
func TestRegionCache(t *testing.T) {
@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() {
// update buckets if it's nil.
cachedRegion.getStore().buckets = nil
// we should replace the version of `cacheRegion` because of stale.
s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1)
s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version)
s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion())
waitUpdateBuckets(defaultBuckets, []byte("a"))
@ -1833,3 +1836,192 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {
cancel()
}
func TestRegionCacheWithDelay(t *testing.T) {
suite.Run(t, new(testRegionCacheWithDelaySuite))
}
type testRegionCacheWithDelaySuite struct {
suite.Suite
mvccStore mocktikv.MVCCStore
cluster *mocktikv.Cluster
store uint64 // store1 is leader
region1 uint64
bo *retry.Backoffer
delay uatomic.Bool
delayCache *RegionCache
cache *RegionCache
}
func (s *testRegionCacheWithDelaySuite) SetupTest() {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1)
s.region1 = regionID
s.store = storeIDs[0]
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.delayCache = NewRegionCache(pdCli2)
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
}
func (s *testRegionCacheWithDelaySuite) TearDownTest() {
s.cache.Close()
s.delayCache.Close()
s.mvccStore.Close()
}
func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() {
r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
fakeRegion := &Region{
meta: r.meta,
syncFlag: r.syncFlag,
lastAccess: r.lastAccess,
invalidReason: r.invalidReason,
}
fakeRegion.setStore(r.getStore().clone())
newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0])
newPeersIDs = s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])
r.invalidate(Other)
r2, err := s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("c"), r2.StartKey())
r2, err = s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r2.StartKey())
stale := !s.cache.insertRegionToCache(fakeRegion, true, true)
s.True(stale)
rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100)
s.NoError(err)
s.Greater(len(rs), 1)
s.NotEqual(rs[0].EndKey(), "")
r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r3.EndKey())
}
func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal(r1.meta, r2.meta)
// simulates network delay
s.delay.Store(true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
r2.invalidate(Other)
_, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
wg.Done()
}()
time.Sleep(30 * time.Millisecond)
newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])
r1.invalidate(Other)
r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
s.delay.Store(false)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
wg.Wait()
// the delay response is received, but insert failed.
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.EndKey)
}
func generateKeyForSimulator(id int, keyLen int) []byte {
k := make([]byte, keyLen)
copy(k, fmt.Sprintf("%010d", id))
return k
}
func BenchmarkInsertRegionToCache(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.Id = uint64(i + 1)
newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1))
newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1))
if i%2 == 0 {
newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.EndKey = []byte("")
} else {
newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.StartKey = []byte("")
}
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true, true)
}
}
func BenchmarkInsertRegionToCache2(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.RegionEpoch.ConfVer = uint64(i + 1)
newMeta.RegionEpoch.Version = uint64(i + 1)
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true, true)
}
}

View File

@ -38,6 +38,8 @@ import (
"bytes"
"github.com/google/btree"
"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"
)
// SortedRegions is a sorted btree.
@ -93,12 +95,16 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int)
// removeIntersecting removes all items that have intersection with the key range of given region.
// If the region itself is in the cache, it's not removed.
func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
func (s *SortedRegions) removeIntersecting(r *Region, verID RegionVerID) ([]*btreeItem, bool) {
var deleted []*btreeItem
var stale bool
s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool {
// Skip the item that is equal to the given region.
if item.cachedRegion.VerID() == r.VerID() {
return true
if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()),
zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion()))
stale = true
return false
}
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
return false
@ -106,10 +112,13 @@ func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
deleted = append(deleted, item)
return true
})
if stale {
return nil, true
}
for _, item := range deleted {
s.b.Delete(item)
}
return deleted
return deleted, false
}
// Clear removes all items from the btree.

View File

@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID)
}
// PutRegion adds or replaces a region.
func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver)
}
// AddPeer adds a new Peer for the Region on the Store.
func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) {
c.Lock()
@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer {
}
}
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region {
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region {
if len(storeIDs) != len(peerIDs) {
panic("len(storeIDs) != len(peerIds)")
}
@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64)
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{},
}
if len(epoch) == 2 {
meta.RegionEpoch.ConfVer = epoch[0]
meta.RegionEpoch.Version = epoch[1]
}
return &Region{
Meta: meta,
leader: leaderPeerID,

View File

@ -61,6 +61,16 @@ var tsMu = struct {
const defaultResourceGroupName = "default"
var _ pd.Client = (*pdClient)(nil)
type MockPDOption func(*pdClient)
func WithDelay(delay *atomic.Bool) MockPDOption {
return func(pc *pdClient) {
pc.delay = delay
}
}
type pdClient struct {
cluster *Cluster
// SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV.
@ -73,11 +83,13 @@ type pdClient struct {
externalTimestamp atomic.Uint64
groups map[string]*rmpb.ResourceGroup
delay *atomic.Bool
}
// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
// from a Cluster.
func NewPDClient(cluster *Cluster) pd.Client {
func NewPDClient(cluster *Cluster, ops ...MockPDOption) *pdClient {
mockCli := &pdClient{
cluster: cluster,
serviceSafePoints: make(map[string]uint64),
@ -97,6 +109,9 @@ func NewPDClient(cluster *Cluster) pd.Client {
},
Priority: 8,
}
for _, op := range ops {
op(mockCli)
}
return mockCli
}
@ -206,6 +221,12 @@ func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegi
if len(opts) == 0 {
buckets = nil
}
if c.delay != nil && c.delay.Load() {
select {
case <-ctx.Done():
case <-time.After(200 * time.Millisecond):
}
}
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}