enhance background job management of region cache (#1171)

* enhance background job management of region cache

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* add some comments

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* address comments

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* fix data race in ut

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* fix data race

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* refine scheduleWithTrigger

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* address the comment

Signed-off-by: zyguan <zhongyangguan@gmail.com>

---------

Signed-off-by: zyguan <zhongyangguan@gmail.com>
This commit is contained in:
zyguan 2024-02-26 11:26:25 +08:00 committed by GitHub
parent 93fff7cec5
commit e72c4cd474
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 430 additions and 230 deletions

View File

@ -508,6 +508,130 @@ func (mu *regionIndexMu) refresh(r []*Region) {
type livenessFunc func(ctx context.Context, s *Store) livenessState
// repeat wraps a `func()` as a schedulable fuction for `bgRunner`.
func repeat(f func()) func(context.Context, time.Time) bool {
return func(_ context.Context, _ time.Time) bool {
f()
return false
}
}
// until wraps a `func() bool` as a schedulable fuction for `bgRunner`.
func until(f func() bool) func(context.Context, time.Time) bool {
return func(_ context.Context, _ time.Time) bool {
return f()
}
}
type bgRunner struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func newBackgroundRunner(ctx context.Context) *bgRunner {
ctx, cancel := context.WithCancel(ctx)
return &bgRunner{
ctx: ctx,
cancel: cancel,
}
}
func (r *bgRunner) closed() bool {
select {
case <-r.ctx.Done():
return true
default:
return false
}
}
func (r *bgRunner) shutdown(wait bool) {
r.cancel()
if wait {
r.wg.Wait()
}
}
// run calls `f` once in background.
func (r *bgRunner) run(f func(context.Context)) {
if r.closed() {
return
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
f(r.ctx)
}()
}
// schedule calls `f` every `interval`.
func (r *bgRunner) schedule(f func(context.Context, time.Time) bool, interval time.Duration) {
if r.closed() || interval <= 0 {
return
}
r.wg.Add(1)
go func() {
ticker := time.NewTicker(interval)
defer func() {
r.wg.Done()
ticker.Stop()
}()
for {
select {
case <-r.ctx.Done():
return
case t := <-ticker.C:
if f(r.ctx, t) {
return
}
}
}
}()
}
// scheduleWithTrigger likes schedule, but also call `f` when `<-trigger`, in which case the time arg of `f` is zero.
func (r *bgRunner) scheduleWithTrigger(f func(context.Context, time.Time) bool, interval time.Duration, trigger <-chan struct{}) {
if r.closed() || interval <= 0 {
return
}
r.wg.Add(1)
go func() {
ticker := time.NewTicker(interval)
defer func() {
r.wg.Done()
ticker.Stop()
}()
triggerEnabled := trigger != nil
for triggerEnabled {
select {
case <-r.ctx.Done():
return
case t := <-ticker.C:
if f(r.ctx, t) {
return
}
case _, ok := <-trigger:
if !ok {
triggerEnabled = false
} else if f(r.ctx, time.Time{}) {
return
}
}
}
for {
select {
case <-r.ctx.Done():
return
case t := <-ticker.C:
if f(r.ctx, t) {
return
}
}
}
}()
}
// RegionCache caches Regions loaded from PD.
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
// purposes only.
@ -529,10 +653,8 @@ type RegionCache struct {
}
notifyCheckCh chan struct{}
// Context for background jobs
ctx context.Context
cancelFunc context.CancelFunc
wg sync.WaitGroup
// runner for background jobs
bg *bgRunner
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
@ -556,12 +678,12 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
interval := config.GetGlobalConfig().StoresRefreshInterval
c.bg = newBackgroundRunner(context.Background())
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
if config.GetGlobalConfig().EnablePreload {
logutil.BgLogger().Info("preload region index start")
if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.ctx, 20000, nil)); err != nil {
if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.bg.ctx, 20000, nil)); err != nil {
logutil.BgLogger().Error("refresh region index failed", zap.Error(err))
}
logutil.BgLogger().Info("preload region index finish")
@ -569,22 +691,35 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.mu = *newRegionIndexMu(nil)
}
// TODO(zyguan): refine management of background cron jobs
c.wg.Add(1)
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
// Default use 15s as the update inerval.
c.wg.Add(1)
go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second)
if config.GetGlobalConfig().RegionsRefreshInterval > 0 {
c.timelyRefreshCache(config.GetGlobalConfig().RegionsRefreshInterval)
var (
refreshStoreInterval = config.GetGlobalConfig().StoresRefreshInterval
needCheckStores []*Store
)
c.bg.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool {
// check and resolve normal stores periodically by default.
filter := func(state resolveState) bool {
return state != unresolved && state != tombstone && state != deleted
}
if t.IsZero() {
// check and resolve needCheck stores because it's triggered by a CheckStoreEvent this time.
filter = func(state resolveState) bool { return state == needCheck }
}
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
c.bg.schedule(repeat(c.checkAndUpdateStoreSlowScores), time.Duration(refreshStoreInterval/4)*time.Second)
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
c.bg.schedule(func(ctx context.Context, _ time.Time) bool {
if err := c.refreshRegionIndex(retry.NewBackofferWithVars(ctx, int(refreshCacheInterval)*1000, nil)); err != nil {
logutil.BgLogger().Error("refresh region cache failed", zap.Error(err))
}
return false
}, time.Duration(refreshCacheInterval)*time.Second)
} else {
// cacheGC is not compatible with timelyRefreshCache
c.wg.Add(1)
go c.cacheGC()
// cache GC is incompatible with cache refresh
c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval)
}
c.wg.Add(1)
go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second)
return c
}
@ -595,7 +730,7 @@ func newTestRegionCache() *RegionCache {
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.bg = newBackgroundRunner(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
}
@ -613,42 +748,12 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldReg
// Close releases region cache's resource.
func (c *RegionCache) Close() {
c.cancelFunc()
c.wg.Wait()
}
// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer func() {
c.wg.Done()
ticker.Stop()
}()
var needCheckStores []*Store
for {
needCheckStores = needCheckStores[:0]
select {
case <-c.ctx.Done():
return
case <-c.getCheckStoreEvents():
c.checkAndResolve(needCheckStores, func(s *Store) bool {
return s.getResolveState() == needCheck
})
case <-ticker.C:
// refresh store to update labels.
c.checkAndResolve(needCheckStores, func(s *Store) bool {
state := s.getResolveState()
// Only valid stores should be reResolved. In fact, it's impossible
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})
}
}
c.bg.shutdown(true)
}
// checkAndResolve checks and resolve addr of failed stores.
// this method isn't thread-safe and only be used by one goroutine.
func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) {
func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) []*Store {
defer func() {
r := recover()
if r != nil {
@ -658,10 +763,12 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}
}()
for _, store := range c.filterStores(needCheckStores, needCheck) {
needCheckStores = c.filterStores(needCheckStores, needCheck)
for _, store := range needCheckStores {
_, err := store.reResolve(c)
tikverr.Log(err)
}
return needCheckStores
}
// SetRegionCacheStore is used to set a store in region cache, for testing only
@ -1775,31 +1882,6 @@ func (c *RegionCache) scanRegionsFromCache(bo *retry.Backoffer, startKey, endKey
return regions, nil
}
func (c *RegionCache) timelyRefreshCache(intervalS uint64) {
if intervalS <= 0 {
return
}
ticker := time.NewTicker(time.Duration(intervalS) * time.Second)
c.wg.Add(1)
go func() {
defer func() {
c.wg.Done()
ticker.Stop()
}()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
intervalMs := int(1000 * intervalS)
if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.ctx, intervalMs, nil)); err != nil {
logutil.BgLogger().Error("refresh region cache failed", zap.Error(err))
}
}
}
}()
}
func (c *RegionCache) refreshRegionIndex(bo *retry.Backoffer) error {
totalRegions := make([]*Region, 0)
startKey := []byte{}
@ -2169,88 +2251,89 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
const cleanCacheInterval = time.Second
const cleanRegionNumPerRound = 50
// This function is expected to run in a background goroutine.
// gcScanItemHook is only used for testing
var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)])
// The returned function is expected to run in a background goroutine.
// It keeps iterating over the whole region cache, searching for stale region
// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound
// regions. In this way, the impact of this background goroutine should be
// negligible.
func (c *RegionCache) cacheGC() {
ticker := time.NewTicker(cleanCacheInterval)
defer func() {
c.wg.Done()
ticker.Stop()
}()
func (c *RegionCache) gcRoundFunc(limit int) func(context.Context, time.Time) bool {
if limit < 1 {
limit = 1
}
beginning := newBtreeSearchItem([]byte(""))
iterItem := beginning
expired := make([]*btreeItem, cleanRegionNumPerRound)
remaining := make([]*Region, cleanRegionNumPerRound)
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
count := 0
expired = expired[:0]
remaining = remaining[:0]
cursor := beginning
expiredItems := make([]*btreeItem, limit)
needCheckRegions := make([]*Region, limit)
// Only RLock when checking TTL to avoid blocking other readers
c.mu.RLock()
ts := time.Now().Unix()
c.mu.sorted.b.AscendGreaterOrEqual(iterItem, func(item *btreeItem) bool {
if count > cleanRegionNumPerRound {
iterItem = item
return false
}
count++
if item.cachedRegion.isCacheTTLExpired(ts) {
expired = append(expired, item)
} else {
remaining = append(remaining, item.cachedRegion)
}
return true
})
c.mu.RUnlock()
return func(_ context.Context, t time.Time) bool {
expiredItems = expiredItems[:0]
needCheckRegions = needCheckRegions[:0]
hasMore, count, ts := false, 0, t.Unix()
onScanItem := gcScanItemHook.Load()
// Reach the end of the region cache, start from the beginning
if count <= cleanRegionNumPerRound {
iterItem = beginning
// Only RLock when checking TTL to avoid blocking other readers
c.mu.RLock()
c.mu.sorted.b.AscendGreaterOrEqual(cursor, func(item *btreeItem) bool {
count++
if count > limit {
cursor = item
hasMore = true
return false
}
// Clean expired regions
if len(expired) > 0 {
c.mu.Lock()
for _, item := range expired {
c.mu.sorted.b.Delete(item)
c.mu.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID())
}
c.mu.Unlock()
if onScanItem != nil {
(*onScanItem)(item)
}
if item.cachedRegion.isCacheTTLExpired(ts) {
expiredItems = append(expiredItems, item)
} else {
needCheckRegions = append(needCheckRegions, item.cachedRegion)
}
return true
})
c.mu.RUnlock()
// Check remaining regions and update sync flags
for _, region := range remaining {
syncFlags := region.getSyncFlags()
if syncFlags&needDelayedReloadReady > 0 {
// the region will be reload soon on access
continue
}
if syncFlags&needDelayedReloadPending > 0 {
region.setSyncFlags(needDelayedReloadReady)
// the region will be reload soon on access, no need to check if it needs to be expired
continue
}
if syncFlags&needExpireAfterTTL == 0 {
regionStore := region.getStore()
for i, store := range regionStore.stores {
// if the region has a stale or unreachable store, let it expire after TTL.
if atomic.LoadUint32(&store.epoch) != regionStore.storeEpochs[i] || store.getLivenessState() != reachable {
region.setSyncFlags(needExpireAfterTTL)
break
}
// Reach the end of the region cache, start from the beginning
if !hasMore {
cursor = beginning
}
// Clean expired regions
if len(expiredItems) > 0 {
c.mu.Lock()
for _, item := range expiredItems {
c.mu.sorted.b.Delete(item)
c.mu.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID())
}
c.mu.Unlock()
}
// Check remaining regions and update sync flags
for _, region := range needCheckRegions {
syncFlags := region.getSyncFlags()
if syncFlags&needDelayedReloadReady > 0 {
// the region will be reload soon on access
continue
}
if syncFlags&needDelayedReloadPending > 0 {
region.setSyncFlags(needDelayedReloadReady)
// the region will be reload soon on access, no need to check if it needs to be expired
continue
}
if syncFlags&needExpireAfterTTL == 0 {
regionStore := region.getStore()
for i, store := range regionStore.stores {
// if the region has a stale or unreachable store, let it expire after TTL.
if atomic.LoadUint32(&store.epoch) != regionStore.storeEpochs[i] || store.getLivenessState() != reachable {
region.setSyncFlags(needExpireAfterTTL)
break
}
}
}
}
return false
}
}
@ -2517,7 +2600,7 @@ type Store struct {
loadStats atomic2.Pointer[storeLoadStats]
// whether the store is unreachable due to some reason, therefore requests to the store needs to be
// forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store.
// forwarded by other stores. this is also the flag that a health check loop is running for this store.
// this mechanism is currently only applicable for TiKV stores.
livenessState uint32
unreachableSince time.Time
@ -2846,59 +2929,49 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil {
return
}
go s.checkUntilHealth(c, liveness, reResolveInterval)
startHealthCheckLoop(c, s, liveness, reResolveInterval)
}
return
}
func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) {
ticker := time.NewTicker(time.Second)
defer func() {
ticker.Stop()
if liveness != reachable {
logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop",
zap.Uint64("storeID", s.storeID),
zap.String("state", s.getResolveState().String()),
zap.String("liveness", s.getLivenessState().String()))
}
}()
func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reResolveInterval time.Duration) {
lastCheckPDTime := time.Now()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if time.Since(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = time.Now()
c.bg.schedule(func(ctx context.Context, t time.Time) bool {
if t.Sub(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = t
valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
if s.getResolveState() == deleted {
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
newStore, _ := c.getStore(s.storeID)
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
}
return
valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
if s.getResolveState() != deleted {
logutil.BgLogger().Warn("[health check] store was still unhealthy at the end of health check loop",
zap.Uint64("storeID", s.storeID),
zap.String("state", s.getResolveState().String()),
zap.String("liveness", s.getLivenessState().String()))
return true
}
}
liveness = s.requestLiveness(c.ctx, c)
atomic.StoreUint32(&s.livenessState, uint32(liveness))
if liveness == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))
return
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
newStore, _ := c.getStore(s.storeID)
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
s = newStore
}
}
}
liveness = s.requestLiveness(ctx, c)
atomic.StoreUint32(&s.livenessState, uint32(liveness))
if liveness == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))
return true
}
return false
}, time.Second)
}
func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) {
@ -3056,24 +3129,6 @@ func (s *Store) markAlreadySlow() {
s.slowScore.markAlreadySlow()
}
// asyncUpdateStoreSlowScore updates the slow score of each store periodically.
func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) {
ticker := time.NewTicker(interval)
defer func() {
c.wg.Done()
ticker.Stop()
}()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
// update store slowScores
c.checkAndUpdateStoreSlowScores()
}
}
}
// checkAndUpdateStoreSlowScores checks and updates slowScore on each store.
func (c *RegionCache) checkAndUpdateStoreSlowScores() {
defer func() {
@ -3109,26 +3164,14 @@ func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) {
atomic.AddUint64(&s.replicaFlowsStats[destType], 1)
}
// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) {
ticker := time.NewTicker(interval)
defer func() {
c.wg.Done()
ticker.Stop()
}()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.forEachStore(func(store *Store) {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
}
})
// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) reportStoreReplicaFlows() {
c.forEachStore(func(store *Store) {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
}
}
})
}
func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) {

View File

@ -49,6 +49,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/internal/apicodec"
@ -70,6 +71,142 @@ func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...p
return c.Client.GetRegion(ctx, key, opts...)
}
func TestBackgroundRunner(t *testing.T) {
t.Run("ShutdownWait", func(t *testing.T) {
dur := 100 * time.Millisecond
r := newBackgroundRunner(context.Background())
r.run(func(ctx context.Context) {
time.Sleep(dur)
})
start := time.Now()
r.shutdown(true)
require.True(t, time.Since(start) >= dur)
})
t.Run("ShutdownNoWait", func(t *testing.T) {
dur := 100 * time.Millisecond
done := make(chan struct{})
r := newBackgroundRunner(context.Background())
r.run(func(ctx context.Context) {
select {
case <-ctx.Done():
close(done)
case <-time.After(dur):
require.Fail(t, "run should be canceled by shutdown")
}
})
r.shutdown(false)
<-done
})
t.Run("RunAfterShutdown", func(t *testing.T) {
var called atomic.Bool
r := newBackgroundRunner(context.Background())
r.shutdown(false)
r.run(func(ctx context.Context) {
called.Store(true)
})
require.False(t, called.Load())
r.schedule(until(func() bool {
called.Store(true)
return true
}), time.Second)
require.False(t, called.Load())
r.scheduleWithTrigger(until(func() bool {
called.Store(true)
return true
}), time.Second, make(chan struct{}))
require.False(t, called.Load())
})
t.Run("Schedule", func(t *testing.T) {
var (
done = make(chan struct{})
interval = 20 * time.Millisecond
history = make([]int64, 0, 3)
start = time.Now().UnixMilli()
)
r := newBackgroundRunner(context.Background())
r.schedule(func(_ context.Context, t time.Time) bool {
history = append(history, t.UnixMilli())
if len(history) == 3 {
close(done)
return true
}
return false
}, interval)
<-done
require.Equal(t, 3, len(history))
for i := range history {
require.LessOrEqual(t, int64(i+1)*interval.Milliseconds(), history[i]-start)
}
history = history[:0]
start = time.Now().UnixMilli()
r.schedule(func(ctx context.Context, t time.Time) bool {
history = append(history, t.UnixMilli())
return false
}, interval)
time.Sleep(interval*3 + interval/2)
r.shutdown(true)
require.Equal(t, 3, len(history))
for i := range history {
require.LessOrEqual(t, int64(i+1)*interval.Milliseconds(), history[i]-start)
}
})
t.Run("ScheduleWithTrigger", func(t *testing.T) {
var (
done = make(chan struct{})
trigger = make(chan struct{})
interval = 20 * time.Millisecond
history = make([]int64, 0, 3)
start = time.Now().UnixMilli()
)
r := newBackgroundRunner(context.Background())
r.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool {
if t.IsZero() {
history = append(history, -1)
} else {
history = append(history, t.UnixMilli())
}
if len(history) == 3 {
close(done)
return true
}
return false
}, interval, trigger)
trigger <- struct{}{}
time.Sleep(interval + interval/2)
trigger <- struct{}{}
<-done
require.Equal(t, 3, len(history))
require.Equal(t, int64(-1), history[0])
require.Equal(t, int64(-1), history[2])
require.LessOrEqual(t, int64(1)*interval.Milliseconds(), history[1]-start)
history = history[:0]
start = time.Now().UnixMilli()
r.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool {
if t.IsZero() {
history = append(history, -1)
} else {
history = append(history, t.UnixMilli())
}
return false
}, interval, trigger)
trigger <- struct{}{}
trigger <- struct{}{}
close(trigger)
time.Sleep(interval + interval/2)
r.shutdown(true)
require.Equal(t, 3, len(history))
require.Equal(t, int64(-1), history[0])
require.Equal(t, int64(-1), history[1])
require.LessOrEqual(t, int64(1)*interval.Milliseconds(), history[2]-start)
})
}
func TestRegionCache(t *testing.T) {
suite.Run(t, new(testRegionCacheSuite))
}
@ -1815,6 +1952,26 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() {
loadRegionsToCache(s.cache, regionCnt)
s.checkCache(regionCnt)
var (
gcScanStats = make(map[uint64]int)
gcScanStatsMu sync.Mutex
gcScanStatsFn = func(item *btreeItem) {
gcScanStatsMu.Lock()
gcScanStats[item.cachedRegion.GetID()]++
gcScanStatsMu.Unlock()
}
)
gcScanItemHook.Store(&gcScanStatsFn)
// Check that region items are scanned uniformly.
time.Sleep(cleanCacheInterval*time.Duration(2*regionCnt/cleanRegionNumPerRound) + cleanCacheInterval/2)
gcScanStatsMu.Lock()
s.Equal(regionCnt, len(gcScanStats))
for _, count := range gcScanStats {
s.Equal(2, count)
}
gcScanStatsMu.Unlock()
// Make parts of the regions stale
remaining := 0
s.cache.mu.Lock()
@ -1901,7 +2058,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
// start health check loop
atomic.StoreUint32(&store1.livenessState, store1Liveness)
go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second)
startHealthCheckLoop(s.cache, store1, livenessState(store1Liveness), time.Second)
// update store meta
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)